diff options
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_write.cpp | 28 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp | 108 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.h | 2 |
6 files changed, 39 insertions, 156 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 847b4a6ac41..878a7be355e 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1028,8 +1028,10 @@ public: switch (shardState) { case IKqpTransactionManager::EXECUTING: YQL_ENSURE(Mode == EMode::COMMIT || Mode == EMode::IMMEDIATE_COMMIT); + break; case IKqpTransactionManager::PREPARED: YQL_ENSURE(Mode == EMode::PREPARE); + break; case IKqpTransactionManager::PREPARING: case IKqpTransactionManager::FINISHED: case IKqpTransactionManager::ERROR: diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 95452266dea..64dbf04f01a 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1354,6 +1354,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup); + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) @@ -1374,9 +1375,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) { auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1); { - auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); } @@ -1407,16 +1405,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) { }; auto prevObserverFunc = runtime.SetObserverFunc(captureRS); - auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - // Send a commit request, it would block on readset exchange auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true)); - evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{}; - // Wait until we captured both readsets const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; { @@ -1467,6 +1460,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup); + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetAppConfig(app) @@ -1487,9 +1481,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1); { - auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); } @@ -1521,16 +1512,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { }; auto prevObserverFunc = runtime.SetObserverFunc(captureRS); - auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - // Send a commit request, it would block on readset exchange auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true)); - evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{}; - // Wait until we captured both readsets const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; if (readSets.size() < expectedReadSets) { @@ -1545,9 +1531,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { // Now send non-conflicting upsert to both tables { - auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {5, 3}}, {tableId2, {6, 3}}} : TEvWriteRows{}; - auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1); - blockReadSets = false; // needed for volatile transactions auto result = KqpSimpleExec(runtime, Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3); @@ -1558,9 +1541,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { // Check that immediate non-conflicting upsert is working too { - auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {7, 4}}} : TEvWriteRows{}; - auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1); - auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4)")); UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); } @@ -2970,6 +2950,7 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup); + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) @@ -2990,9 +2971,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1); { - auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - Cerr << "===== UPSERT initial rows" << Endl; ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); @@ -3042,9 +3020,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri }; auto prevObserverFunc = runtime.SetObserverFunc(captureRS); - auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - Cerr << "===== UPSERT and commit" << Endl; // Send a commit request, it would block on readset exchange @@ -3070,8 +3045,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } - evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{}; - // Select key 3 and verify its value was updated { Cerr << "===== Last SELECT" << Endl; @@ -3291,10 +3264,13 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) { } Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") - .SetUseRealThreads(false); + .SetUseRealThreads(false) + .SetAppConfig(app); Tests::TServer::TPtr server = new TServer(serverSettings); auto &runtime = *server->GetRuntime(); @@ -3305,9 +3281,6 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) { CreateShardedTable(server, sender, "/Root", "table-1", 1); CreateShardedTable(server, sender, "/Root", "table-2", 1); - auto rows = EvWrite ? TEvWriteRows{{{1, 1}}, {{2, 2}}, {{3, 3}}, {{5, 5}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)")); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 252e414d895..94d46ecda11 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -3705,7 +3705,16 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { // 2. tx2: upsert into range2 > range1 range and commit. // 3. tx1: read range2 -> lock should be broken - TTestHelper helper; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(app); + + TTestHelper helper(serverSettings); auto readVersion = CreateVolatileSnapshot( helper.Server, @@ -3724,9 +3733,6 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); - auto rows = EvWrite ? TEvWriteRows{{{300, 0, 0, 3000}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(*helper.Server->GetRuntime(), rows); - // write new data above snapshot ExecSQL(helper.Server, helper.Sender, R"( SELECT * FROM `/Root/table-1` WHERE key1 == 300; diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index 8d2254d68a4..8852ff8438d 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -40,14 +40,20 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { } Y_UNIT_TEST_TWIN(ExecSQLUpsertImmediate, EvWrite) { - auto [runtime, server, sender] = TestCreateServer(); + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(app); + + auto [runtime, server, sender] = TestCreateServer(serverSettings); TShardedTableOptions opts; auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); - auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - Cout << "========= Send immediate write =========\n"; { ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);")); @@ -63,7 +69,16 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { } Y_UNIT_TEST_QUAD(ExecSQLUpsertPrepared, EvWrite, Volatile) { - auto [runtime, server, sender] = TestCreateServer(); + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite); + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(app); + + auto [runtime, server, sender] = TestCreateServer(serverSettings); runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile); @@ -71,9 +86,6 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts); auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", opts); - auto rows = EvWrite ? TEvWriteRows{{tableId1, {0, 1}}, {tableId2, {2, 3}}} : TEvWriteRows{}; - auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); - Cout << "========= Send distributed write =========\n"; { ExecSQL(server, sender, Q_( diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 6e19f17694b..8dff10e4301 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -2257,114 +2257,6 @@ NKikimrDataEvents::TEvWriteResult Update(TTestActorRuntime& runtime, TActorId se return Write(runtime, sender, shardId, std::move(request), expectedStatus); } -TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) { - if (rows.empty()) - return {}; - - auto requestObserver = runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) { - if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction) - return; - - const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record; - - if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA) - return; - - // Parse original TEvProposeTransaction - const ui64 txId = record.GetTxId(); - const TString& txBody = record.GetTxBody(); - NKikimrTxDataShard::TDataTransaction tx; - Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size())); - - // Construct new EvWrite - TVector<TCell> cells; - TTableId tableId; - ui16 colCount = 0; - for (const auto& task : tx.GetKqpTransaction().GetTasks()) { - NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; - Y_VERIFY(task.GetMeta().UnpackTo(&meta)); - if (!meta.HasWrites()) - continue; - - const auto& tableMeta = meta.GetTable(); - TTableId tableIdProto(tableMeta.GetTableId().GetOwnerId(), tableMeta.GetTableId().GetTableId(), tableMeta.GetSchemaVersion()); - Y_VERIFY_S(tableId == TTableId{} || tableId == tableIdProto, "Only writes to one table is supported now"); - tableId = tableIdProto; - const auto& writes = meta.GetWrites(); - Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now."); - colCount = writes.GetColumns().size(); - - const auto& row = rows.ProcessRow(tableId, txId); - Y_VERIFY(row.Cells.size() == colCount); - std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells)); - } - - Cerr << "TEvProposeTransaction " << txId << " is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl; - - auto txMode = NKikimr::NDataShard::NEvWrite::TConvertor::GetTxMode(record.GetFlags()); - - auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode); - - if (!cells.empty()) { - TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount); - TString blobData = matrix.ReleaseBuffer(); - - UNIT_ASSERT(blobData.size() < 8_MB); - - std::vector<ui32> columnIds(colCount); - std::iota(columnIds.begin(), columnIds.end(), 1); - - ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); - } - - // Copy locks - if (tx.HasLockTxId()) - evWrite->Record.SetLockTxId(tx.GetLockTxId()); - if (tx.HasLockNodeId()) - evWrite->Record.SetLockNodeId(tx.GetLockNodeId()); - if (tx.GetKqpTransaction().HasLocks()) - evWrite->Record.MutableLocks()->CopyFrom(tx.GetKqpTransaction().GetLocks()); - - if (record.HasMvccSnapshot()) { - *evWrite->Record.MutableMvccSnapshot() = record.GetMvccSnapshot(); - } - - // Replace event - auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie); - handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite()); - event.Reset(handle); - }); - - auto responseObserver = runtime.AddObserver([](TAutoPtr<IEventHandle>& event) { - if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult) - return; - - const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record; - ui64 txId = record.GetTxId(); - - Cerr << "EvWriteResult " << txId << " is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl; - - // Construct new EvProposeTransactionResult - ui64 origin = record.GetOrigin(); - auto status = NKikimr::NDataShard::NEvWrite::TConvertor::GetStatus(record.GetStatus()); - - auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status); - - if (status == NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED) { - evResult->SetPrepared(record.GetMinStep(), record.GetMaxStep(), {}); - evResult->Record.MutableDomainCoordinators()->CopyFrom(record.GetDomainCoordinators()); - } - - // Replace event - auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie); - handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite()); - event.Reset(handle); - }); - - return {std::move(requestObserver), std::move(responseObserver)}; -} - NKikimrDataEvents::TEvWriteResult WaitForWriteCompleted(TTestActorRuntime& runtime, TActorId sender, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus) { auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 2c42040fdca..da08a456312 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -836,8 +836,6 @@ class TEvWriteRows : public std::vector<TEvWriteRow> { } }; -TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows); - void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values); struct TSendProposeToCoordinatorOptions { |
