aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-09 11:27:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit4c04647502a17edde71af13f347306f15a8d7c32 (patch)
tree24119f7c14d1207931eee8c11def0ccbc47e9787
parent0bab9fa2d6aac5eae69533a5ee0c68ffb46a2e92 (diff)
downloadydb-4c04647502a17edde71af13f347306f15a8d7c32.tar.gz
Add a test for mvcc switching code, fix bugs, KIKIMR-14259
ref:af5cda47e318ace63443dcf99771f88325d757ac
-rw-r--r--ydb/core/tx/datashard/check_data_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp89
4 files changed, 90 insertions, 7 deletions
diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp
index 034a0b2c4c..deb9f1f903 100644
--- a/ydb/core/tx/datashard/check_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp
@@ -260,6 +260,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op,
record.GetSnapshotTxId());
if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
+ // TODO: try upgrading to mvcc snapshot when available
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder()
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index b9256ffc7d..3498162c17 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1320,11 +1320,6 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
// to be consistent while dependencies calculation
auto snapshot = dataTx->GetKqpTransaction().GetSnapshot();
tx->SetMvccSnapshot(TRowVersion(snapshot.GetStep(), snapshot.GetTxId()));
- } else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) {
- // to be consistent while dependencies calculation
- auto step = dataTx->GetReadTableTransaction().GetSnapshotStep();
- auto txId = dataTx->GetReadTableTransaction().GetSnapshotTxId();
- tx->SetMvccSnapshot(TRowVersion(step, txId));
}
}
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index 3f98283509..da1219bb9e 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -242,8 +242,6 @@ bool TSnapshotManager::AdvanceWatermark(NTable::TDatabase& db, const TRowVersion
}
void TSnapshotManager::RemoveRowVersions(NTable::TDatabase& db, const TRowVersion& from, const TRowVersion& to) {
- Y_VERIFY(IsMvccEnabled());
-
for (auto& it : Self->GetUserTables()) {
auto begin = Snapshots.lower_bound(TSnapshotKey(Self->GetPathOwnerId(), it.first, from.Step, from.TxId));
auto end = Snapshots.upper_bound(TSnapshotKey(Self->GetPathOwnerId(), it.first, to.Step, to.TxId));
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 53594e9a41..ca3bc867fa 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -885,6 +885,95 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}
+ void CompactTable(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
+ auto sender = runtime.AllocateEdgeActor();
+ auto request = MakeHolder<TEvDataShard::TEvCompactTable>(tableId.PathId.OwnerId, tableId.PathId.LocalPathId);
+ runtime.SendToPipe(shardId, sender, request.Release(), 0, GetPipeConfigWithRetries());
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
+ auto& result = ev->Get()->Record;
+ UNIT_ASSERT(result.GetStatus() == NKikimrTxDataShard::TEvCompactTableResult::OK);
+ }
+
+ Y_UNIT_TEST(SwitchMvccSnapshots) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ auto shards2 = GetTableShards(server, sender, "/Root/table-2");
+ auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+ auto tableId2 = ResolveTableId(server, sender, "/Root/table-2");
+
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 1), (2, 2), (3, 3);");
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 10), (20, 20), (30, 30);");
+
+ auto snapshot1 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000));
+
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 11), (2, 22), (3, 33), (4, 44);");
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 11), (20, 22), (30, 33), (40, 44);");
+
+ runtime.GetAppData().FeatureFlags.SetEnableMvccForTest(true);
+ RebootTablet(runtime, shards1.at(0), sender);
+ RebootTablet(runtime, shards2.at(0), sender);
+
+ auto snapshot2 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000));
+
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 111), (2, 222), (3, 333), (4, 444);");
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 111), (20, 222), (30, 333), (40, 444);");
+
+ auto snapshot3 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000));
+
+ CompactTable(runtime, shards1.at(0), tableId1);
+ CompactTable(runtime, shards2.at(0), tableId2);
+
+ // None of created snapshots should be removed
+ auto removed1 = GetRemovedRowVersions(server, shards1.at(0));
+ UNIT_ASSERT(!removed1.Contains(snapshot1, snapshot1.Next()));
+ UNIT_ASSERT(!removed1.Contains(snapshot2, snapshot2.Next()));
+ UNIT_ASSERT(!removed1.Contains(snapshot3, snapshot3.Next()));
+
+ // Versions to the left and to the right of the first snapshot must be removed
+ UNIT_ASSERT(removed1.Contains(snapshot1.Prev(), snapshot1));
+ UNIT_ASSERT(removed1.Contains(snapshot1.Next(), snapshot1.Next().Next()));
+
+ auto table1snapshot3 = ReadShardedTable(server, "/Root/table-1", snapshot3);
+ UNIT_ASSERT_VALUES_EQUAL(table1snapshot3,
+ "key = 1, value = 111\n"
+ "key = 2, value = 222\n"
+ "key = 3, value = 333\n"
+ "key = 4, value = 444\n");
+
+ auto table1snapshot2 = ReadShardedTable(server, "/Root/table-1", snapshot2);
+ UNIT_ASSERT_VALUES_EQUAL(table1snapshot2,
+ "key = 1, value = 11\n"
+ "key = 2, value = 22\n"
+ "key = 3, value = 33\n"
+ "key = 4, value = 44\n");
+
+ auto table1snapshot1 = ReadShardedTable(server, "/Root/table-1", snapshot1);
+ UNIT_ASSERT_VALUES_EQUAL(table1snapshot1,
+ "key = 1, value = 1\n"
+ "key = 2, value = 2\n"
+ "key = 3, value = 3\n");
+ }
+
}
} // namespace NKikimr