diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-09 11:27:33 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 4c04647502a17edde71af13f347306f15a8d7c32 (patch) | |
tree | 24119f7c14d1207931eee8c11def0ccbc47e9787 | |
parent | 0bab9fa2d6aac5eae69533a5ee0c68ffb46a2e92 (diff) | |
download | ydb-4c04647502a17edde71af13f347306f15a8d7c32.tar.gz |
Add a test for mvcc switching code, fix bugs, KIKIMR-14259
ref:af5cda47e318ace63443dcf99771f88325d757ac
-rw-r--r-- | ydb/core/tx/datashard/check_data_tx_unit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_snapshots.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 89 |
4 files changed, 90 insertions, 7 deletions
diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp index 034a0b2c4c..deb9f1f903 100644 --- a/ydb/core/tx/datashard/check_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp @@ -260,6 +260,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op, record.GetSnapshotTxId()); if (!DataShard.GetSnapshotManager().AcquireReference(key)) { + // TODO: try upgrading to mvcc snapshot when available BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError( NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, TStringBuilder() diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index b9256ffc7d..3498162c17 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1320,11 +1320,6 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: // to be consistent while dependencies calculation auto snapshot = dataTx->GetKqpTransaction().GetSnapshot(); tx->SetMvccSnapshot(TRowVersion(snapshot.GetStep(), snapshot.GetTxId())); - } else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) { - // to be consistent while dependencies calculation - auto step = dataTx->GetReadTableTransaction().GetSnapshotStep(); - auto txId = dataTx->GetReadTableTransaction().GetSnapshotTxId(); - tx->SetMvccSnapshot(TRowVersion(step, txId)); } } diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index 3f98283509..da1219bb9e 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -242,8 +242,6 @@ bool TSnapshotManager::AdvanceWatermark(NTable::TDatabase& db, const TRowVersion } void TSnapshotManager::RemoveRowVersions(NTable::TDatabase& db, const TRowVersion& from, const TRowVersion& to) { - Y_VERIFY(IsMvccEnabled()); - for (auto& it : Self->GetUserTables()) { auto begin = Snapshots.lower_bound(TSnapshotKey(Self->GetPathOwnerId(), it.first, from.Step, from.TxId)); auto end = Snapshots.upper_bound(TSnapshotKey(Self->GetPathOwnerId(), it.first, to.Step, to.TxId)); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 53594e9a41..ca3bc867fa 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -885,6 +885,95 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } + void CompactTable(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) { + auto sender = runtime.AllocateEdgeActor(); + auto request = MakeHolder<TEvDataShard::TEvCompactTable>(tableId.PathId.OwnerId, tableId.PathId.LocalPathId); + runtime.SendToPipe(shardId, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender); + auto& result = ev->Get()->Record; + UNIT_ASSERT(result.GetStatus() == NKikimrTxDataShard::TEvCompactTableResult::OK); + } + + Y_UNIT_TEST(SwitchMvccSnapshots) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + auto shards2 = GetTableShards(server, sender, "/Root/table-2"); + auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + auto tableId2 = ResolveTableId(server, sender, "/Root/table-2"); + + ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 1), (2, 2), (3, 3);"); + ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 10), (20, 20), (30, 30);"); + + auto snapshot1 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000)); + + ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 11), (2, 22), (3, 33), (4, 44);"); + ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 11), (20, 22), (30, 33), (40, 44);"); + + runtime.GetAppData().FeatureFlags.SetEnableMvccForTest(true); + RebootTablet(runtime, shards1.at(0), sender); + RebootTablet(runtime, shards2.at(0), sender); + + auto snapshot2 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000)); + + ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 111), (2, 222), (3, 333), (4, 444);"); + ExecSQL(server, sender, "UPSERT INTO [/Root/table-2] (key, value) VALUES (10, 111), (20, 222), (30, 333), (40, 444);"); + + auto snapshot3 = CreateVolatileSnapshot(server, { "/Root/table-1", "/Root/table-2" }, TDuration::MilliSeconds(30000)); + + CompactTable(runtime, shards1.at(0), tableId1); + CompactTable(runtime, shards2.at(0), tableId2); + + // None of created snapshots should be removed + auto removed1 = GetRemovedRowVersions(server, shards1.at(0)); + UNIT_ASSERT(!removed1.Contains(snapshot1, snapshot1.Next())); + UNIT_ASSERT(!removed1.Contains(snapshot2, snapshot2.Next())); + UNIT_ASSERT(!removed1.Contains(snapshot3, snapshot3.Next())); + + // Versions to the left and to the right of the first snapshot must be removed + UNIT_ASSERT(removed1.Contains(snapshot1.Prev(), snapshot1)); + UNIT_ASSERT(removed1.Contains(snapshot1.Next(), snapshot1.Next().Next())); + + auto table1snapshot3 = ReadShardedTable(server, "/Root/table-1", snapshot3); + UNIT_ASSERT_VALUES_EQUAL(table1snapshot3, + "key = 1, value = 111\n" + "key = 2, value = 222\n" + "key = 3, value = 333\n" + "key = 4, value = 444\n"); + + auto table1snapshot2 = ReadShardedTable(server, "/Root/table-1", snapshot2); + UNIT_ASSERT_VALUES_EQUAL(table1snapshot2, + "key = 1, value = 11\n" + "key = 2, value = 22\n" + "key = 3, value = 33\n" + "key = 4, value = 44\n"); + + auto table1snapshot1 = ReadShardedTable(server, "/Root/table-1", snapshot1); + UNIT_ASSERT_VALUES_EQUAL(table1snapshot1, + "key = 1, value = 1\n" + "key = 2, value = 2\n" + "key = 3, value = 3\n"); + } + } } // namespace NKikimr |