aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <kungasc@ydb.tech>2025-07-17 17:41:58 +0200
committerGitHub <noreply@github.com>2025-07-17 18:41:58 +0300
commit2dd146e30258a3466424347b44fcea86dd86b29e (patch)
treedea88c3254eb5c354d2400c73839fff5bd5701ea
parent4b68bfb40d7d280a34fcb0a4228cacdd8324091f (diff)
downloadydb-2dd146e30258a3466424347b44fcea86dd86b29e.tar.gz
Add Shared Cache tx tests (#21311)
-rw-r--r--ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp433
1 files changed, 361 insertions, 72 deletions
diff --git a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
index 1337eb09d21..8b0b6d2ee15 100644
--- a/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/ut/ut_shared_sausagecache.cpp
@@ -3,13 +3,12 @@
#include <util/system/sanitizers.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/cms/console/console.h>
+#include <ydb/core/testlib/actors/block_events.h>
namespace NKikimr::NSharedCache {
using namespace NTabletFlatExecutor;
-Y_UNIT_TEST_SUITE(TSharedPageCache) {
-
enum : ui32 {
TableId = 101,
KeyColumnId = 1,
@@ -31,15 +30,14 @@ struct TTxInitSchema : public ITransaction {
if (txc.DB.GetScheme().GetTableInfo(TableId))
return true;
- TCompactionPolicy policy;
- policy.MinBTreeIndexNodeSize = 128;
+ CompactionPolicy->MinBTreeIndexNodeSize = 128;
txc.DB.Alter()
.AddTable("test" + ToString(ui32(TableId)), TableId)
.AddColumn(TableId, "key", KeyColumnId, NScheme::TInt64::TypeId, false)
.AddColumn(TableId, "value", ValueColumnId, NScheme::TString::TypeId, false)
.AddColumnToKey(TableId, KeyColumnId)
- .SetCompactionPolicy(TableId, policy);
+ .SetCompactionPolicy(TableId, *CompactionPolicy);
return true;
}
@@ -47,6 +45,8 @@ struct TTxInitSchema : public ITransaction {
void Complete(const TActorContext& ctx) override {
ctx.Send(ctx.SelfID, new NFake::TEvReturn);
}
+
+ NLocalDb::TCompactionPolicyPtr CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy();
};
struct TTxWriteRow : public ITransaction {
@@ -74,39 +74,57 @@ struct TTxWriteRow : public ITransaction {
}
};
-struct TTxReadRow : public ITransaction {
- i64 Key;
+struct TTxReadRows : public ITransaction {
+ TVector<i64> Keys;
TRetriedCounters& Retried;
ui32 Attempts = 0;
+ bool& Completed;
- explicit TTxReadRow(i64 key, TRetriedCounters& retried)
- : Key(key)
+ TTxReadRows(TVector<i64> keys, TRetriedCounters& retried, bool& completed)
+ : Keys(keys)
, Retried(retried)
+ , Completed(completed)
+ {
+ Completed = false;
+ }
+
+ TTxReadRows(TVector<i64> keys, TRetriedCounters& retried)
+ : TTxReadRows(keys, retried, Completed_)
+ { }
+
+ TTxReadRows(i64 key, TRetriedCounters& retried)
+ : TTxReadRows(TVector<i64>{key}, retried)
{ }
bool Execute(TTransactionContext& txc, const TActorContext&) override {
Increment(Retried, Attempts);
Attempts++;
- TVector<TRawTypeValue> rawKey;
- rawKey.emplace_back(&Key, sizeof(Key), NScheme::TInt64::TypeId);
+ bool ready = true;
+ for (auto key : Keys) {
+ TVector<TRawTypeValue> rawKey;
+ rawKey.emplace_back(&key, sizeof(key), NScheme::TInt64::TypeId);
- TVector<NTable::TTag> tags;
- tags.push_back(KeyColumnId);
- tags.push_back(ValueColumnId);
+ TVector<NTable::TTag> tags;
+ tags.push_back(KeyColumnId);
+ tags.push_back(ValueColumnId);
- NTable::TRowState row;
- auto ready = txc.DB.Select(TableId, rawKey, tags, row);
- if (ready == NTable::EReady::Page) {
- return false;
+ NTable::TRowState row;
+ if (txc.DB.Select(TableId, rawKey, tags, row) == NTable::EReady::Page) {
+ ready = false;
+ }
}
- return true;
+ return ready;
}
- void Complete(const TActorContext&ctx) override {
+ void Complete(const TActorContext& ctx) override {
+ Completed = true;
ctx.Send(ctx.SelfID, new NFake::TEvReturn);
}
+
+private:
+ bool Completed_;
};
THolder<TSharedPageCacheCounters> GetSharedPageCounters(TMyEnvBase& env) {
@@ -151,6 +169,8 @@ void SetupSharedCache(TMyEnvBase& env, NKikimrSharedCache::TReplacementPolicy po
}
}
+Y_UNIT_TEST_SUITE(TSharedPageCache) {
+
Y_UNIT_TEST(Limits) {
TMyEnvBase env;
env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE);
@@ -179,7 +199,7 @@ Y_UNIT_TEST(Limits) {
TRetriedCounters retried;
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0);
@@ -247,7 +267,7 @@ Y_UNIT_TEST(Limits_Config) {
TRetriedCounters retried;
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(counters->LoadInFlyBytes->Val(), 0);
@@ -315,7 +335,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -325,7 +345,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging)
@@ -333,7 +353,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -341,7 +361,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -351,16 +371,16 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
// read some key twice
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1}));
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1}));
// simulate scan
retried = {};
for (i64 key = 1; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging)
@@ -368,15 +388,15 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
// read the key again
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1}));
RestartAndClearCache(env);
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging)
@@ -384,7 +404,7 @@ Y_UNIT_TEST(ThreeLeveledLRU) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB / 3 * 2), static_cast<i64>(1_MB / 3)); // 2 full layers (fresh & staging)
@@ -420,7 +440,7 @@ Y_UNIT_TEST(S3FIFO) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -430,7 +450,7 @@ Y_UNIT_TEST(S3FIFO) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -438,7 +458,7 @@ Y_UNIT_TEST(S3FIFO) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -446,7 +466,7 @@ Y_UNIT_TEST(S3FIFO) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -456,16 +476,16 @@ Y_UNIT_TEST(S3FIFO) {
// read some key twice
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1}));
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1}));
// simulate scan
retried = {};
for (i64 key = 1; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -473,15 +493,15 @@ Y_UNIT_TEST(S3FIFO) {
// read the key again
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1}));
RestartAndClearCache(env);
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -489,7 +509,7 @@ Y_UNIT_TEST(S3FIFO) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -525,7 +545,7 @@ Y_UNIT_TEST(ClockPro) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -535,7 +555,7 @@ Y_UNIT_TEST(ClockPro) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -543,7 +563,7 @@ Y_UNIT_TEST(ClockPro) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -551,7 +571,7 @@ Y_UNIT_TEST(ClockPro) {
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -561,16 +581,16 @@ Y_UNIT_TEST(ClockPro) {
// read some key twice
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1}));
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1}));
// simulate scan
retried = {};
for (i64 key = 1; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -578,13 +598,13 @@ Y_UNIT_TEST(ClockPro) {
// read the key again
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1}));
// simulate scan again
retried = {};
for (i64 key = 1; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -592,15 +612,15 @@ Y_UNIT_TEST(ClockPro) {
// read the key again again
retried = {};
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(0, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(0, retried) }, true);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1}));
RestartAndClearCache(env);
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -608,7 +628,7 @@ Y_UNIT_TEST(ClockPro) {
retried = {};
for (i64 key = 0; key < 100; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_DOUBLES_EQUAL(counters->ActiveBytes->Val(), static_cast<i64>(8_MB), static_cast<i64>(1_MB / 3));
@@ -642,7 +662,7 @@ Y_UNIT_TEST(ReplacementPolicySwitch) {
TRetriedCounters retried = {};
for (i64 key = 0; key < 3; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3, 3, 1, 1}));
@@ -653,13 +673,13 @@ Y_UNIT_TEST(ReplacementPolicySwitch) {
retried = {};
for (i64 key = 0; key < 3; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3}));
retried = {};
for (i64 key = 90; key < 93; ++key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{3, 3, 2, 1}));
@@ -703,7 +723,7 @@ Y_UNIT_TEST(BigCache_BTreeIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100}));
@@ -720,7 +740,7 @@ Y_UNIT_TEST(BigCache_BTreeIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 14, 2}));
@@ -770,7 +790,7 @@ Y_UNIT_TEST(BigCache_FlatIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100}));
@@ -787,7 +807,7 @@ Y_UNIT_TEST(BigCache_FlatIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100}));
@@ -837,7 +857,7 @@ Y_UNIT_TEST(MiddleCache_BTreeIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 19, 2}));
@@ -854,7 +874,7 @@ Y_UNIT_TEST(MiddleCache_BTreeIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 14, 2}));
@@ -904,7 +924,7 @@ Y_UNIT_TEST(MiddleCache_FlatIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 19}));
@@ -921,7 +941,7 @@ Y_UNIT_TEST(MiddleCache_FlatIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100}));
@@ -971,7 +991,7 @@ Y_UNIT_TEST(ZeroCache_BTreeIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 100, 100, 100}));
@@ -988,7 +1008,7 @@ Y_UNIT_TEST(ZeroCache_BTreeIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100, 100, 100, 100}));
@@ -1038,7 +1058,7 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) {
TRetriedCounters retried;
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) });
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) });
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100}));
@@ -1055,7 +1075,7 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) {
LogCounters(counters);
retried = {};
for (i64 key = 99; key >= 0; --key) {
- env.SendSync(new NFake::TEvExecute{ new TTxReadRow(key, retried) }, true);
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(key, retried) }, true);
}
LogCounters(counters);
UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{100, 100}));
@@ -1071,4 +1091,273 @@ Y_UNIT_TEST(ZeroCache_FlatIndex) {
}
+Y_UNIT_TEST_SUITE(TSharedPageCache_WaitPads) {
+
+void BasicSetup(TMyEnvBase& env) {
+ env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE);
+ env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_TRACE);
+ env->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
+
+ env->GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(true);
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+ env.SendSync(new NFake::TEvExecute{ new TTxInitSchema() });
+
+ SetupSharedCache(env, NKikimrSharedCache::S3FIFO, 0_MB, true);
+
+ // write 100 rows, each ~100KB (~10MB)
+ for (i64 key = 0; key < 100; ++key) {
+ TString value(size_t(100 * 1024), char('a' + key % 26));
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) });
+ }
+
+ Cerr << "...compacting" << Endl;
+ env.SendSync(new NFake::TEvCompact(TableId));
+ Cerr << "...waiting until compacted" << Endl;
+ env.WaitFor<NFake::TEvCompacted>();
+}
+
+void ManyPartsSetup(TMyEnvBase& env) {
+ env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE);
+ env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_TRACE);
+ env->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
+
+ env->GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+ TAutoPtr<TTxInitSchema> initSchema = new TTxInitSchema();
+ initSchema->CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy();
+ initSchema->CompactionPolicy->InMemForceStepsToSnapshot = 1;
+ for (auto& gen : initSchema->CompactionPolicy->Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 100;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ gen.UpliftPartSize = 0;
+ }
+ env.SendSync(new NFake::TEvExecute{ initSchema });
+
+ SetupSharedCache(env, NKikimrSharedCache::S3FIFO, 0_MB, true);
+
+ // write 100 rows, each ~100KB (~10MB)
+ for (i64 key = 0; key < 100; ++key) {
+ TString value(size_t(100 * 1024), char('a' + key % 26));
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow(key, std::move(value)) });
+ }
+}
+
+Y_UNIT_TEST(One_Transaction_One_Key) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ BasicSetup(env);
+
+ Cerr << "...making read" << Endl;
+ TRetriedCounters retried;
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows(33, retried) });
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 4);
+}
+
+Y_UNIT_TEST(One_Transaction_Two_Keys) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ BasicSetup(env);
+
+ Cerr << "...making read" << Endl;
+ TRetriedCounters retried;
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows({33, 66}, retried) });
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 7);
+}
+
+Y_UNIT_TEST(One_Transaction_Two_Keys_Many_Parts) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ ManyPartsSetup(env);
+
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 96);
+
+ Cerr << "...making read" << Endl;
+ TRetriedCounters retried;
+ env.SendSync(new NFake::TEvExecute{ new TTxReadRows({33, 66}, retried) });
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 98);
+}
+
+Y_UNIT_TEST(Two_Transactions_One_Key) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ BasicSetup(env);
+
+ Cerr << "...making read" << Endl;
+
+ TBlockEvents<NSharedCache::TEvRequest> block(env.Env);
+
+ TRetriedCounters retried1, retried2;
+ bool completed1, completed2;
+ env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried1, completed1) });
+ env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried2, completed2) });
+
+ // CHANGE LATER: block.size() == 2
+ env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0);
+
+ block.Unblock();
+ // CHANGE LATER: block.size() == 2
+ env.Env.WaitFor("blocked TEvRequest 2", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 2
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 1);
+
+ block.Unblock();
+ // CHANGE LATER: block.size() == 2
+ env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 4
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 2);
+
+ block.Unblock();
+ // CHANGE LATER: block.size() == 2
+ env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 6
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 3);
+
+ block.Unblock();
+ env.Env.WaitFor("blocked TEvRequest 4", [&]{ return completed1 && completed2; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT(block.empty());
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1, 1}));
+ // CHANGE LATER: counters->CacheMissPages->Val() == 8
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 4);
+}
+
+Y_UNIT_TEST(Two_Transactions_Two_Keys) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ BasicSetup(env);
+
+ Cerr << "...making read" << Endl;
+
+ TBlockEvents<NSharedCache::TEvRequest> block(env.Env);
+
+ TRetriedCounters retried1, retried2;
+ bool completed1, completed2;
+ env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried1, completed1) });
+ env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({66}, retried2, completed2) });
+
+ // b-tree index root is common page:
+ // CHANGE LATER: block.size() == 2
+ env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0);
+
+ block.Unblock();
+ env.Env.WaitFor("blocked TEvRequest 2", [&]{ return block.size() == 2; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 1
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 1);
+
+ block.Unblock();
+ env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 2; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 4
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 3);
+
+ block.Unblock();
+ env.Env.WaitFor("blocked TEvRequest 3", [&]{ return block.size() == 2; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed1, false);
+ UNIT_ASSERT_VALUES_EQUAL(completed2, false);
+ // CHANGE LATER: counters->CacheMissPages->Val() == 6
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 5);
+
+ block.Unblock();
+ env.Env.WaitFor("blocked TEvRequest 4", [&]{ return completed1 && completed2; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT(block.empty());
+ UNIT_ASSERT_VALUES_EQUAL(retried1, (TVector<ui32>{1, 1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(retried2, (TVector<ui32>{1, 1, 1, 1, 1}));
+ // CHANGE LATER: counters->CacheMissPages->Val() == 8
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 7);
+}
+
+Y_UNIT_TEST(Compaction) {
+ TMyEnvBase env;
+ auto counters = GetSharedPageCounters(env);
+
+ BasicSetup(env);
+
+ Cerr << "...making read" << Endl;
+
+ TBlockEvents<NSharedCache::TEvRequest> block(env.Env);
+
+ TRetriedCounters retried;
+ bool completed;
+ env.SendAsync(new NFake::TEvExecute{ new TTxReadRows({33}, retried, completed) });
+
+ env.Env.WaitFor("blocked TEvRequest 1", [&]{ return block.size() == 1; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1}));
+ UNIT_ASSERT_VALUES_EQUAL(completed, false);
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 0);
+
+ block.Stop();
+
+ Cerr << "...compacting" << Endl;
+ env.SendSync(new NFake::TEvCompact(TableId));
+ Cerr << "...waiting until compacted" << Endl;
+ env.WaitFor<NFake::TEvCompacted>();
+
+ // page collection invalidation activates waiting transactions:
+ env.Env.WaitFor("blocked TEvRequest 2", [&]{ return completed; }, TDuration::Seconds(10));
+ LogCounters(counters);
+ UNIT_ASSERT_VALUES_EQUAL(retried, (TVector<ui32>{1, 1, 1, 1, 1, 1}));
+ UNIT_ASSERT_VALUES_EQUAL(counters->CacheMissPages->Val(), 121);
+
+ block.Unblock();
+ env->SimulateSleep(TDuration::Seconds(3));
+ // nothing should crash
+}
+
+}
+
}