aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan Nikolaev <ivannik@ydb.tech>2025-03-07 12:31:42 +0300
committerGitHub <noreply@github.com>2025-03-07 12:31:42 +0300
commitaf2a9a90be724d0a06c3bac393e4ef49fde38f51 (patch)
tree81c6307af90efb4a183acaa42eacdd29bfa6ce43
parent9ee43447b4ffac9f221fc1e8a754f0f946e42628 (diff)
downloadydb-af2a9a90be724d0a06c3bac393e4ef49fde38f51.tar.gz
DataShard: clean readsets in DataCleanup (#15438)
-rw-r--r--ydb/core/tx/datashard/datashard__data_cleanup.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp206
2 files changed, 165 insertions, 43 deletions
diff --git a/ydb/core/tx/datashard/datashard__data_cleanup.cpp b/ydb/core/tx/datashard/datashard__data_cleanup.cpp
index b1342c33909..9d261614db6 100644
--- a/ydb/core/tx/datashard/datashard__data_cleanup.cpp
+++ b/ydb/core/tx/datashard/datashard__data_cleanup.cpp
@@ -55,6 +55,8 @@ public:
"DataCleanup of tablet# " << Self->TabletID()
<< ": expired snapshots removed");
}
+ Self->OutReadSets.Cleanup(db, ctx);
+
Self->Executor()->CleanupData(Ev->Get()->Record.GetDataCleanupGeneration());
Self->DataCleanupWaiters.insert({Ev->Get()->Record.GetDataCleanupGeneration(), Ev->Sender});
return true;
diff --git a/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp b/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp
index a464c8a3ef1..0a918aa010a 100644
--- a/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_data_cleanup.cpp
@@ -6,10 +6,27 @@ using namespace Tests;
Y_UNIT_TEST_SUITE(DataCleanup) {
- static const TString DeletedShortValue("Some_value");
- static const TString DeletedLongValue(size_t(100 * 1024), 't');
- static const TString PresentShortValue("Some_other_value");
- static const TString PresentLongValue(size_t(100 * 1024), 'r');
+ static const TString DeletedSubkey1("Subkey1");
+ static const TString PresentSubkey2("Subkey2");
+ static const TString PresentSubkey3("Subkey3");
+ static const TString DeletedSubkey4("Subkey4");
+
+ static const TString DeletedShortValue1("_Some_value_1_");
+ static const TString PresentLongValue2(size_t(100 * 1024), 'r');
+ static const TString PresentShortValue3("_Some_value_3_");
+ static const TString DeletedLongValue4(size_t(100 * 1024), 't');
+
+ int CountBlobsWithSubstring(ui64 tabletId, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& substring) {
+ int res = 0;
+ for (const auto& proxyDS : proxyDSs) {
+ for (const auto& [id, blob] : proxyDS->AllMyBlobs()) {
+ if (id.TabletID() == tabletId && !blob.DoNotKeep && blob.Buffer.ConvertToString().Contains(substring)) {
+ ++res;
+ }
+ }
+ }
+ return res;
+ }
bool BlobStorageContains(const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& value) {
for (const auto& proxyDS : proxyDSs) {
@@ -22,7 +39,7 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
return false;
}
- auto SetupWithTable() {
+ auto SetupWithTable(bool withCompaction) {
TVector<TServerSettings::TProxyDSPtr> proxyDSs {
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(0)),
MakeIntrusive<NFake::TProxyDS>(TGroupId::FromValue(2181038080)),
@@ -42,39 +59,50 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
auto opts = TShardedTableOptions()
.Columns({
- {"key", "Uint32", true, false},
- {"value", "Utf8", false, false}
+ {"key", "Uint32", true, false},
+ {"subkey", "String", true, false},
+ {"value", "Utf8", false, false}
});
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
UploadRows(runtime, "/Root/table-1",
- {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
- {TCell::Make(ui32(1))}, {TCell(DeletedShortValue)}
+ {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
+ {TCell::Make(ui32(1)), TCell(DeletedSubkey1)}, {TCell(DeletedShortValue1)}
);
UploadRows(runtime, "/Root/table-1",
- {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
- {TCell::Make(ui32(2))}, {TCell(PresentLongValue)}
+ {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
+ {TCell::Make(ui32(2)), TCell(PresentSubkey2)}, {TCell(PresentLongValue2)}
);
UploadRows(runtime, "/Root/table-1",
- {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
- {TCell::Make(ui32(3))}, {TCell(PresentShortValue)}
+ {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
+ {TCell::Make(ui32(3)), TCell(PresentSubkey3)}, {TCell(PresentShortValue3)}
);
UploadRows(runtime, "/Root/table-1",
- {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::UTF8}},
- {TCell::Make(ui32(4))}, {TCell(DeletedLongValue)}
+ {{"key", Ydb::Type::UINT32}, {"subkey", Ydb::Type::STRING}, {"value", Ydb::Type::UTF8}},
+ {TCell::Make(ui32(4)), TCell(DeletedSubkey4)}, {TCell(DeletedLongValue4)}
);
- auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true);
- UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK);
-
- UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue));
- UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue));
- UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue));
- UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue));
-
- ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
-
- SimulateSleep(runtime, TDuration::Seconds(2));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey1));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedSubkey4));
+
+ // short values inlined in log
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedShortValue1));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3));
+
+ if (withCompaction) {
+ auto compactionResult = CompactTable(runtime, shards.at(0), tableId, true);
+ UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::OK);
+
+ // uncompressed long values should be present only after compaction
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, DeletedLongValue4));
+ } else {
+ // before compaction long values persisted in log only in compressed format
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, PresentLongValue2));
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4));
+ }
return std::make_tuple(server, sender, shards, proxyDSs);
}
@@ -85,22 +113,31 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
UNIT_ASSERT_VALUES_EQUAL(ev.Record.GetDataCleanupGeneration(), generation);
}
- void CheckTableData(Tests::TServer::TPtr server, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs) {
- auto result = ReadShardedTable(server, "/Root/table-1");
- UNIT_ASSERT_EQUAL(result,
- "key = 2, value = " + PresentLongValue + "\n"
- "key = 3, value = " + PresentShortValue + "\n"
+ void CheckTableData(Tests::TServer::TPtr server, const TVector<TServerSettings::TProxyDSPtr>& proxyDSs, const TString& table) {
+ auto result = ReadShardedTable(server, table);
+ UNIT_ASSERT_VALUES_EQUAL(result,
+ "key = 2, subkey = " + PresentSubkey2 + ", value = " + PresentLongValue2 + "\n"
+ "key = 3, subkey = " + PresentSubkey3 + ", value = " + PresentShortValue3 + "\n"
);
- UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue));
- UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue));
- UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue));
- UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue));
+
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey1));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey2));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentSubkey3));
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedSubkey4));
+
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedShortValue1));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentLongValue2));
+ UNIT_ASSERT(BlobStorageContains(proxyDSs, PresentShortValue3));
+ UNIT_ASSERT(!BlobStorageContains(proxyDSs, DeletedLongValue4));
}
Y_UNIT_TEST(ForceDataCleanup) {
- auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) {
auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);
@@ -114,13 +151,38 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
cleanupAndCheck(24);
cleanupAndCheck(25);
- CheckTableData(server, proxyDSs);
+ CheckTableData(server, proxyDSs, "/Root/table-1");
+ }
+
+
+ Y_UNIT_TEST(ForceDataCleanupWithoutCompaction) {
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(false);
+ auto& runtime = *server->GetRuntime();
+
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ auto cleanupAndCheck = [&runtime, &sender, &tableShards](ui64 expectedDataCleanupGeneration) {
+ auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);
+
+ runtime.SendToPipe(tableShards.at(0), sender, request.Release(), 0, GetPipeConfigWithRetries());
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
+ CheckResultEvent(*ev->Get(), tableShards.at(0), expectedDataCleanupGeneration);
+ };
+
+ cleanupAndCheck(24);
+
+ CheckTableData(server, proxyDSs, "/Root/table-1");
}
Y_UNIT_TEST(MultipleDataCleanups) {
- auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
ui64 expectedGenFirst = 42;
ui64 expectedGenLast = 43;
auto request1 = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedGenFirst);
@@ -139,13 +201,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenLast);
}
- CheckTableData(server, proxyDSs);
+ CheckTableData(server, proxyDSs, "/Root/table-1");
}
Y_UNIT_TEST(MultipleDataCleanupsWithOldGenerations) {
- auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
ui64 expectedGenFirst = 42;
ui64 expectedGenOld = 10;
auto request1 = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedGenFirst);
@@ -164,13 +229,16 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), expectedGenFirst);
}
- CheckTableData(server, proxyDSs);
+ CheckTableData(server, proxyDSs, "/Root/table-1");
}
Y_UNIT_TEST(ForceDataCleanupWithRestart) {
- auto [server, sender, tableShards, proxyDSs] = SetupWithTable();
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
auto& runtime = *server->GetRuntime();
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
ui64 cleanupGeneration = 33;
ui64 oldGeneration = 10;
ui64 olderGeneration = 5;
@@ -207,7 +275,59 @@ Y_UNIT_TEST_SUITE(DataCleanup) {
CheckResultEvent(*ev->Get(), tableShards.at(0), cleanupGeneration);
}
- CheckTableData(server, proxyDSs);
+ CheckTableData(server, proxyDSs, "/Root/table-1");
+ }
+
+ Y_UNIT_TEST(OutReadSetsCleanedAfterCopyTable) {
+ auto [server, sender, tableShards, proxyDSs] = SetupWithTable(true);
+ auto& runtime = *server->GetRuntime();
+
+ UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 3); // in log + after compaction: part switch in log and sst
+
+ size_t readSetsWithDeletedSubkey1 = 0;
+ auto prevObserver = runtime.SetObserverFunc([&readSetsWithDeletedSubkey1](TAutoPtr<IEventHandle> &ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ if (msg->Record.SerializeAsString().Contains(DeletedSubkey1)) {
+ ++readSetsWithDeletedSubkey1;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1");
+ WaitTxNotification(server, sender, txIdCopy);
+ auto table2Shards = GetTableShards(server, sender, "/Root/table-2");
+ auto table2Id = ResolveTableId(server, sender, "/Root/table-2");
+
+ UNIT_ASSERT_VALUES_EQUAL(readSetsWithDeletedSubkey1, 1);
+ UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 4); // + outreadset
+ UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 1); // in log
+
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-1` WHERE key IN (1, 4);");
+ ExecSQL(server, sender, "DELETE FROM `/Root/table-2` WHERE key IN (1, 4);");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(tableShards.at(0), proxyDSs, DeletedSubkey1), 5); // + deletion in log
+ UNIT_ASSERT_VALUES_EQUAL(CountBlobsWithSubstring(table2Shards.at(0), proxyDSs, DeletedSubkey1), 2); // + deletion in log
+
+ auto cleanupAndCheck = [&runtime, &sender](ui64 tabletId, ui64 expectedDataCleanupGeneration) {
+ auto request = MakeHolder<TEvDataShard::TEvForceDataCleanup>(expectedDataCleanupGeneration);
+
+ runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvForceDataCleanupResult>(sender);
+ CheckResultEvent(*ev->Get(), tabletId, expectedDataCleanupGeneration);
+ };
+
+ cleanupAndCheck(table2Shards.at(0), 24);
+ cleanupAndCheck(tableShards.at(0), 24);
+
+ CheckTableData(server, proxyDSs, "/Root/table-1");
+ CheckTableData(server, proxyDSs, "/Root/table-2");
}
}