diff options
author | kungurtsev <kungasc@ydb.tech> | 2025-07-17 17:41:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-17 18:41:58 +0300 |
commit | 2dd146e30258a3466424347b44fcea86dd86b29e (patch) | |
tree | dea88c3254eb5c354d2400c73839fff5bd5701ea | |
parent | 4b68bfb40d7d280a34fcb0a4228cacdd8324091f (diff) | |
download | ydb-2dd146e30258a3466424347b44fcea86dd86b29e.tar.gz |
Add Shared Cache tx tests (#21311)
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp | 433 |
1 files changed, 361 insertions, 72 deletions
diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp index 1337eb09d21..8b0b6d2ee15 100644 --- a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp @@ -3,13 +3,12 @@ #include <util/system/sanitizers.h> #include <ydb/core/base/counters.h> #include <ydb/core/cms/console/console.h> +#include <ydb/core/testlib/actors/block_events.h> namespace NKikimr::NSharedCache { using namespace NTabletFlatExecutor; -Y_UNIT_TEST_SUITE(TSharedPageCache) { - enum : ui32 { TableId = 101, KeyColumnId = 1, @@ -31,15 +30,14 @@ struct TTxInitSchema : public ITransaction { if (txc.DB.GetScheme().GetTableInfo(TableId)) return true; - TCompactionPolicy policy; - policy.MinBTreeIndexNodeSize = 128; + CompactionPolicy->MinBTreeIndexNodeSize = 128; txc.DB.Alter() .AddTable("test" + ToString(ui32(TableId)), TableId) .AddColumn(TableId, "key", KeyColumnId, NScheme::TInt64::TypeId, false) .AddColumn(TableId, "value", ValueColumnId, NScheme::TString::TypeId, false) .AddColumnToKey(TableId, KeyColumnId) - .SetCompactionPolicy(TableId, policy); + .SetCompactionPolicy(TableId, *CompactionPolicy); return true; } @@ -47,6 +45,8 @@ struct TTxInitSchema : public ITransaction { void Complete(const TActorContext& ctx) override { ctx.Send(ctx.SelfID, new NFake::TEvReturn); } + + NLocalDb::TCompactionPolicyPtr CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy(); }; struct TTxWriteRow : public ITransaction { @@ -74,39 +74,57 @@ struct TTxWriteRow : public ITransaction { } }; -struct TTxReadRow : public ITransaction { - i64 Key; +struct TTxReadRows : public ITransaction { + TVector<i64> Keys; TRetriedCounters& Retried; ui32 Attempts = 0; + bool& Completed; - explicit TTxReadRow(i64 key, TRetriedCounters& retried) - : Key(key) + TTxReadRows(TVector<i64> keys, TRetriedCounters& retried, bool& completed) + : Keys(keys) , Retried(retried) + , Completed(completed) + { + Completed = false; + } + + TTxReadRows(TVector<i64> keys, TRetriedCounters& retried) + : TTxReadRows(keys, retried, Completed_) + { } + + TTxReadRows(i64 key, TRetriedCounters& retried) + : TTxReadRows(TVector<i64>{key}, retried) { } bool Execute(TTransactionContext& txc, const TActorContext&) override { Increment(Retried, Attempts); Attempts++; - TVector<TRawTypeValue> rawKey; - rawKey.emplace_back(&Key, sizeof(Key), NScheme::TInt64::TypeId); + bool ready = true; + for (auto key : Keys) { + TVector<TRawTypeValue> rawKey; + rawKey.emplace_back(&key, sizeof(key), NScheme::TInt64::TypeId); - TVector<NTable::TTag> tags; - tags.push_back(KeyColumnId); - tags.push_back(ValueColumnId); + TVector<NTable::TTag> tags; + tags.push_back(KeyColumnId); + tags.push_back(ValueColumnId); - NTable::TRowState row; - auto ready = txc.DB.Select(TableId, rawKey, tags, row); - if (ready == NTable::EReady::Page) { - return false; + NTable::TRowState row; + if (txc.DB.Select(TableId, rawKey, tags, row) == NTable::EReady::Page) { + ready = false; + } } - return true; + return ready; } - void Complete(const TActorContext&ctx) override { + void Complete(const TActorContext& ctx) override { + Completed = true; ctx.Send(ctx.SelfID, new NFake::TEvReturn); } + +private: + bool Completed_; }; THolder<TSharedPageCacheCounters> GetSharedPageCounters(TMyEnvBase& env) { @@ -151,6 +169,8 @@ void SetupSharedCache(TMyEnvBase& env, NKikimrSharedCache::TReplacementPolicy po } } +Y_UNIT_TEST_SUITE(TSharedPageCache) { + Y_UNIT_TEST(Limits) { TMyEnvBase env; env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE); @@ -179,7 +199,7 @@ Y_UNIT_TEST(Limits) { TRetriedCounters retried; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0); @@ -247,7 +267,7 @@ Y_UNIT_TEST(Limits_Config) { TRetriedCounters retried; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0); @@ -315,7 +335,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -325,7 +345,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging) @@ -333,7 +353,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -341,7 +361,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -351,16 +371,16 @@ Y_UNIT_TEST(ThreeLeveledLRU) { // read some key twice retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1})); retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1})); // simulate scan retried = {}; for (i64 key = 1; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging) @@ -368,15 +388,15 @@ Y_UNIT_TEST(ThreeLeveledLRU) { // read the key again retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1})); RestartAndClearCache(env); retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging) @@ -384,7 +404,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging) @@ -420,7 +440,7 @@ Y_UNIT_TEST(S3FIFO) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -430,7 +450,7 @@ Y_UNIT_TEST(S3FIFO) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -438,7 +458,7 @@ Y_UNIT_TEST(S3FIFO) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -446,7 +466,7 @@ Y_UNIT_TEST(S3FIFO) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -456,16 +476,16 @@ Y_UNIT_TEST(S3FIFO) { // read some key twice retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1})); retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1})); // simulate scan retried = {}; for (i64 key = 1; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -473,15 +493,15 @@ Y_UNIT_TEST(S3FIFO) { // read the key again retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1})); RestartAndClearCache(env); retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -489,7 +509,7 @@ Y_UNIT_TEST(S3FIFO) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -525,7 +545,7 @@ Y_UNIT_TEST(ClockPro) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -535,7 +555,7 @@ Y_UNIT_TEST(ClockPro) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -543,7 +563,7 @@ Y_UNIT_TEST(ClockPro) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -551,7 +571,7 @@ Y_UNIT_TEST(ClockPro) { retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -561,16 +581,16 @@ Y_UNIT_TEST(ClockPro) { // read some key twice retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1})); retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1})); // simulate scan retried = {}; for (i64 key = 1; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -578,13 +598,13 @@ Y_UNIT_TEST(ClockPro) { // read the key again retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1})); // simulate scan again retried = {}; for (i64 key = 1; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -592,15 +612,15 @@ Y_UNIT_TEST(ClockPro) { // read the key again again retried = {}; - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1})); RestartAndClearCache(env); retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -608,7 +628,7 @@ Y_UNIT_TEST(ClockPro) { retried = {}; for (i64 key = 0; key < 100; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3)); @@ -642,7 +662,7 @@ Y_UNIT_TEST(ReplacementPolicySwitch) { TRetriedCounters retried = {}; for (i64 key = 0; key < 3; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3, 3, 1, 1})); @@ -653,13 +673,13 @@ Y_UNIT_TEST(ReplacementPolicySwitch) { retried = {}; for (i64 key = 0; key < 3; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3})); retried = {}; for (i64 key = 90; key < 93; ++key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3, 3, 2, 1})); @@ -703,7 +723,7 @@ Y_UNIT_TEST(BigCache_BTreeIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100})); @@ -720,7 +740,7 @@ Y_UNIT_TEST(BigCache_BTreeIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 14, 2})); @@ -770,7 +790,7 @@ Y_UNIT_TEST(BigCache_FlatIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100})); @@ -787,7 +807,7 @@ Y_UNIT_TEST(BigCache_FlatIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100})); @@ -837,7 +857,7 @@ Y_UNIT_TEST(MiddleCache_BTreeIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 19, 2})); @@ -854,7 +874,7 @@ Y_UNIT_TEST(MiddleCache_BTreeIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 14, 2})); @@ -904,7 +924,7 @@ Y_UNIT_TEST(MiddleCache_FlatIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 19})); @@ -921,7 +941,7 @@ Y_UNIT_TEST(MiddleCache_FlatIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100})); @@ -971,7 +991,7 @@ Y_UNIT_TEST(ZeroCache_BTreeIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 100, 100, 100})); @@ -988,7 +1008,7 @@ Y_UNIT_TEST(ZeroCache_BTreeIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 100, 100, 100})); @@ -1038,7 +1058,7 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) { TRetriedCounters retried; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100})); @@ -1055,7 +1075,7 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) { LogCounters(counters); retried = {}; for (i64 key = 99; key >= 0; --key) { - env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true); + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true); } LogCounters(counters); UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100})); @@ -1071,4 +1091,273 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) { } +Y_UNIT_TEST_SUITE(TSharedPageCache_WaitPads) { + +void BasicSetup(TMyEnvBase& env) { + env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE); + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_TRACE); + env->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE); + + env->GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(true); + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() }); + + SetupSharedCache(env, NKikimrSharedCache::S3FIFO, 0_MB, true); + + // write 100 rows, each ~100KB (~10MB) + for (i64 key = 0; key < 100; ++key) { + TString value(size_t(100 * 1024), char('a' + key % 26)); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) }); + } + + Cerr << "...compacting" << Endl; + env.SendSync(new NFake::TEvCompact(TableId)); + Cerr << "...waiting until compacted" << Endl; + env.WaitFor<NFake::TEvCompacted>(); +} + +void ManyPartsSetup(TMyEnvBase& env) { + env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE); + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_TRACE); + env->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE); + + env->GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + TAutoPtr<TTxInitSchema> initSchema = new TTxInitSchema(); + initSchema->CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy(); + initSchema->CompactionPolicy->InMemForceStepsToSnapshot = 1; + for (auto& gen : initSchema->CompactionPolicy->Generations) { + gen.ExtraCompactionPercent = 0; + gen.ExtraCompactionMinSize = 100; + gen.ExtraCompactionExpPercent = 0; + gen.ExtraCompactionExpMaxSize = 0; + gen.UpliftPartSize = 0; + } + env.SendSync(new NFake::TEvExecute{ initSchema }); + + SetupSharedCache(env, NKikimrSharedCache::S3FIFO, 0_MB, true); + + // write 100 rows, each ~100KB (~10MB) + for (i64 key = 0; key < 100; ++key) { + TString value(size_t(100 * 1024), char('a' + key % 26)); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) }); + } +} + +Y_UNIT_TEST(One_Transaction_One_Key) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + BasicSetup(env); + + Cerr << "...making read" << Endl; + TRetriedCounters retried; + env.SendSync(new NFake::TEvExecute{ new TTxReadRows(33, retried) }); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 4); +} + +Y_UNIT_TEST(One_Transaction_Two_Keys) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + BasicSetup(env); + + Cerr << "...making read" << Endl; + TRetriedCounters retried; + env.SendSync(new NFake::TEvExecute{ new TTxReadRows({33, 66}, retried) }); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 7); +} + +Y_UNIT_TEST(One_Transaction_Two_Keys_Many_Parts) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + ManyPartsSetup(env); + + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 96); + + Cerr << "...making read" << Endl; + TRetriedCounters retried; + env.SendSync(new NFake::TEvExecute{ new TTxReadRows({33, 66}, retried) }); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1})); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 98); +} + +Y_UNIT_TEST(Two_Transactions_One_Key) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + BasicSetup(env); + + Cerr << "...making read" << Endl; + + TBlockEvents<NSharedCache::TEvRequest> block(env.Env); + + TRetriedCounters retried1, retried2; + bool completed1, completed2; + env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried1, completed1) }); + env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried2, completed2) }); + + // CHANGE LATER: block.size() == 2 + env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0); + + block.Unblock(); + // CHANGE LATER: block.size() == 2 + env.Env.WaitFor("blocked TEvRequest 2", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 2 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 1); + + block.Unblock(); + // CHANGE LATER: block.size() == 2 + env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 4 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 2); + + block.Unblock(); + // CHANGE LATER: block.size() == 2 + env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 6 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 3); + + block.Unblock(); + env.Env.WaitFor("blocked TEvRequest 4", [&]{ return completed1 && completed2; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT(block.empty()); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1, 1})); + // CHANGE LATER: counters->CacheMissPages->Val() == 8 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 4); +} + +Y_UNIT_TEST(Two_Transactions_Two_Keys) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + BasicSetup(env); + + Cerr << "...making read" << Endl; + + TBlockEvents<NSharedCache::TEvRequest> block(env.Env); + + TRetriedCounters retried1, retried2; + bool completed1, completed2; + env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried1, completed1) }); + env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({66}, retried2, completed2) }); + + // b-tree index root is common page: + // CHANGE LATER: block.size() == 2 + env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0); + + block.Unblock(); + env.Env.WaitFor("blocked TEvRequest 2", [&]{ return block.size() == 2; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 1 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 1); + + block.Unblock(); + env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 2; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 4 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 3); + + block.Unblock(); + env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 2; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(completed1, false); + UNIT_ASSERT_VALUES_EQUAL(completed2, false); + // CHANGE LATER: counters->CacheMissPages->Val() == 6 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 5); + + block.Unblock(); + env.Env.WaitFor("blocked TEvRequest 4", [&]{ return completed1 && completed2; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT(block.empty()); + UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1, 1})); + // CHANGE LATER: counters->CacheMissPages->Val() == 8 + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 7); +} + +Y_UNIT_TEST(Compaction) { + TMyEnvBase env; + auto counters = GetSharedPageCounters(env); + + BasicSetup(env); + + Cerr << "...making read" << Endl; + + TBlockEvents<NSharedCache::TEvRequest> block(env.Env); + + TRetriedCounters retried; + bool completed; + env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried, completed) }); + + env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1})); + UNIT_ASSERT_VALUES_EQUAL(completed, false); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0); + + block.Stop(); + + Cerr << "...compacting" << Endl; + env.SendSync(new NFake::TEvCompact(TableId)); + Cerr << "...waiting until compacted" << Endl; + env.WaitFor<NFake::TEvCompacted>(); + + // page collection invalidation activates waiting transactions: + env.Env.WaitFor("blocked TEvRequest 2", [&]{ return completed; }, TDuration::Seconds(10)); + LogCounters(counters); + UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1, 1})); + UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 121); + + block.Unblock(); + env->SimulateSleep(TDuration::Seconds(3)); + // nothing should crash +} + +} + } |