aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-11-01 16:36:11 +0300
committerkungasc <kungasc@yandex-team.com>2023-11-01 17:02:27 +0300
commita18f54585040aedbeeebd9d1cdcc1950146ae81a (patch)
treee7acd7ac446a92d3b24c1ee997920ae8bafbf6a1
parent7b1c4ab3d1bec352de63255e7e43f694b58104db (diff)
downloadydb-a18f54585040aedbeeebd9d1cdcc1950146ae81a.tar.gz
KIKIMR-19878 Fix missing shared cache unregister
-rw-r--r--ydb/core/testlib/basics/services.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_stats.cpp124
3 files changed, 80 insertions, 46 deletions
diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp
index 16ed1d15d9..560ca56f68 100644
--- a/ydb/core/testlib/basics/services.cpp
+++ b/ydb/core/testlib/basics/services.cpp
@@ -134,6 +134,7 @@ namespace NPDisk {
pageCollectionCacheConfig->CacheConfig = new TCacheCacheConfig(caches.Shared, nullptr, nullptr, nullptr);
pageCollectionCacheConfig->TotalAsyncQueueInFlyLimit = caches.AsyncQueue;
pageCollectionCacheConfig->TotalScanQueueInFlyLimit = caches.ScanQueue;
+ pageCollectionCacheConfig->Counters = MakeIntrusive<TSharedPageCacheCounters>(runtime.GetDynamicCounters(nodeIndex));
runtime.AddLocalService(MakeSharedPageCacheId(0),
TActorSetupCmd(
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 9257faf4df..aa5de6e1b1 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -116,6 +116,7 @@ public:
private:
void Die(const TActorContext& ctx) override {
ctx.Send(MakeResourceBrokerID(), new TEvResourceBroker::TEvNotifyActorDied);
+ ctx.Send(MakeSharedPageCacheId(), new NSharedCache::TEvUnregister);
TActorBootstrapped::Die(ctx);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp
index 9486577dd9..61baa3f479 100644
--- a/ydb/core/tx/datashard/datashard_ut_stats.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp
@@ -1,5 +1,6 @@
#include "datashard_ut_common.h"
#include "datashard_ut_common_kqp.h"
+#include "ydb/core/tablet_flat/shared_sausagecache.h"
namespace NKikimr {
@@ -9,7 +10,7 @@ using namespace Tests;
Y_UNIT_TEST_SUITE(DataShardStats) {
- NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, size_t minPartCount = 0) {
+ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, size_t minPartCount = 0, size_t minRows = 0) {
NKikimrTxDataShard::TEvPeriodicTableStats stats;
bool captured = false;
@@ -17,7 +18,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
switch (ev->GetTypeRewrite()) {
case TEvDataShard::TEvPeriodicTableStats::EventType: {
stats = ev->Get<TEvDataShard::TEvPeriodicTableStats>()->Record;
- if (stats.GetTableStats().GetPartCount() >= minPartCount) {
+ if (stats.GetTableStats().GetPartCount() >= minPartCount && stats.GetTableStats().GetRowCount() >= minRows) {
captured = true;
}
break;
@@ -44,15 +45,6 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
return stats;
}
- NKikimrTxDataShard::TEvCompactTableResult CompactTable(TTestActorRuntime& runtime, ui64 tabletId, const TPathId& pathId) {
- auto sender = runtime.AllocateEdgeActor();
- auto request = MakeHolder<TEvDataShard::TEvCompactTable>(pathId.OwnerId, pathId.LocalPathId);
- runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());
-
- auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
- return ev->Get()->Record;
- }
-
NKikimrTableStats::TTableStats GetTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 tableId) {
auto sender = runtime.AllocateEdgeActor();
auto request = MakeHolder<TEvDataShard::TEvGetTableStats>(tableId);
@@ -88,29 +80,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", 1);
- auto shards = GetTableShards(server, sender, "/Root/table-1");
- UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
- TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 704u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
- pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}
- CompactTable(runtime, shards.at(0), pathId);
+ CompactTable(runtime, shard1, tableId1, false);
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ auto stats = WaitTableStats(runtime, 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 65u);
@@ -144,30 +134,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
.Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false, "hdd"}})
.Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}, {.Name = "hdd", .DataPoolKind = "hdd"}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);
-
- auto shards = GetTableShards(server, sender, "/Root/table-1");
- UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3)");
- TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 752u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
- pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}
- CompactTable(runtime, shards.at(0), pathId);
+ CompactTable(runtime, shard1, tableId1, false);
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ auto stats = WaitTableStats(runtime, 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 115u);
@@ -203,8 +190,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", 1);
- auto shards = GetTableShards(server, sender, "/Root/table-1");
- UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
const int count = 2000;
TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
@@ -215,24 +202,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}
ExecSQL(server, sender, query);
- TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 196096u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
- pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}
- CompactTable(runtime, shards.at(0), pathId);
+ CompactTable(runtime, shard1, tableId1, false);
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ auto stats = WaitTableStats(runtime, 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 30100u);
@@ -244,7 +229,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}
{
- auto stats = GetTableStats(runtime, shards.at(0), pathId.LocalPathId);
+ auto stats = GetTableStats(runtime, shard1, tableId1.PathId.LocalPathId);
auto dataSizeHistogram = ReadHistogram(stats.GetDataSizeHistogram());
TVector<std::pair<ui64, ui64>> expectedDataSizeHistogram = {{475, 7145}, {950, 14290}, {1425, 21435}, {1900, 28580}};
@@ -287,9 +272,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
.ExternalPoolKind = "ext", .DataThreshold = 100u, .ExternalThreshold = 200u},
{.Name = "hdd", .DataPoolKind = "hdd"}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);
-
- auto shards = GetTableShards(server, sender, "/Root/table-1");
- UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
TString smallValue(150, 'S');
TString largeValue(1500, 'L');
@@ -300,24 +284,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
"(4, \"" + largeValue + "\", \"BBB\"), " +
"(5, \"CCC\", \"" + largeValue + "\")");
- TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 4232u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
- pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}
- CompactTable(runtime, shards.at(0), pathId);
+ CompactTable(runtime, shard1, tableId1, false);
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
+ auto stats = WaitTableStats(runtime, 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 3555u);
@@ -336,6 +318,56 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}
}
+ Y_UNIT_TEST(SharedCacheGarbage) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "String", true, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+
+ const int batches = 10;
+ const int batchItems = 10;
+ for (auto batch : xrange(batches)) {
+ TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
+ for (auto item = 0; item < batchItems; item++) {
+ if (item != 0)
+ query += ", ";
+ query += "(0, \"" + TString(7000, 'x') + ToString(batch * batchItems + item) + "\") ";
+ }
+ Cerr << query << Endl << Endl;
+ ExecSQL(server, sender, query);
+ CompactTable(runtime, shard1, tableId1, false);
+
+ Cerr << "... waiting for stats after compaction" << Endl;
+ auto stats = WaitTableStats(runtime, 1, (batch + 1) * batchItems);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
+ }
+
+ // each batch ~70KB, ~700KB in total
+ auto counters = MakeIntrusive<TSharedPageCacheCounters>(runtime.GetDynamicCounters());
+ Cerr << "ActiveBytes = " << counters->ActiveBytes->Val() << " PassiveBytes = " << counters->PassiveBytes->Val() << Endl;
+ UNIT_ASSERT_LE(counters->ActiveBytes->Val(), 800*1024); // one index
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardStats)
} // namespace NKikimr