summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp41
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp28
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp108
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h2
6 files changed, 39 insertions, 156 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 847b4a6ac41..878a7be355e 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1028,8 +1028,10 @@ public:
switch (shardState) {
case IKqpTransactionManager::EXECUTING:
YQL_ENSURE(Mode == EMode::COMMIT || Mode == EMode::IMMEDIATE_COMMIT);
+ break;
case IKqpTransactionManager::PREPARED:
YQL_ENSURE(Mode == EMode::PREPARE);
+ break;
case IKqpTransactionManager::PREPARING:
case IKqpTransactionManager::FINISHED:
case IKqpTransactionManager::ERROR:
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 95452266dea..64dbf04f01a 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -1354,6 +1354,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup);
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
@@ -1374,9 +1375,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);
{
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
}
@@ -1407,16 +1405,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
// Send a commit request, it would block on readset exchange
auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true));
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
-
// Wait until we captured both readsets
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2;
{
@@ -1467,6 +1460,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup);
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
@@ -1487,9 +1481,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);
{
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
}
@@ -1521,16 +1512,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
// Send a commit request, it would block on readset exchange
auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true));
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
-
// Wait until we captured both readsets
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2;
if (readSets.size() < expectedReadSets) {
@@ -1545,9 +1531,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
// Now send non-conflicting upsert to both tables
{
- auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {5, 3}}, {tableId2, {6, 3}}} : TEvWriteRows{};
- auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1);
-
blockReadSets = false; // needed for volatile transactions
auto result = KqpSimpleExec(runtime, Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);
@@ -1558,9 +1541,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
// Check that immediate non-conflicting upsert is working too
{
- auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {7, 4}}} : TEvWriteRows{};
- auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1);
-
auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4)"));
UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
}
@@ -2970,6 +2950,7 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup);
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
@@ -2990,9 +2971,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);
{
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
Cerr << "===== UPSERT initial rows" << Endl;
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
@@ -3042,9 +3020,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
Cerr << "===== UPSERT and commit" << Endl;
// Send a commit request, it would block on readset exchange
@@ -3070,8 +3045,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
-
// Select key 3 and verify its value was updated
{
Cerr << "===== Last SELECT" << Endl;
@@ -3291,10 +3264,13 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
}
Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
- .SetUseRealThreads(false);
+ .SetUseRealThreads(false)
+ .SetAppConfig(app);
Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
@@ -3305,9 +3281,6 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
- auto rows = EvWrite ? TEvWriteRows{{{1, 1}}, {{2, 2}}, {{3, 3}}, {{5, 5}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)"));
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 252e414d895..94d46ecda11 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -3705,7 +3705,16 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
// 2. tx2: upsert into range2 > range1 range and commit.
// 3. tx1: read range2 -> lock should be broken
- TTestHelper helper;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(app);
+
+ TTestHelper helper(serverSettings);
auto readVersion = CreateVolatileSnapshot(
helper.Server,
@@ -3724,9 +3733,6 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1);
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0);
- auto rows = EvWrite ? TEvWriteRows{{{300, 0, 0, 3000}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(*helper.Server->GetRuntime(), rows);
-
// write new data above snapshot
ExecSQL(helper.Server, helper.Sender, R"(
SELECT * FROM `/Root/table-1` WHERE key1 == 300;
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index 8d2254d68a4..8852ff8438d 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -40,14 +40,20 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
Y_UNIT_TEST_TWIN(ExecSQLUpsertImmediate, EvWrite) {
- auto [runtime, server, sender] = TestCreateServer();
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(app);
+
+ auto [runtime, server, sender] = TestCreateServer(serverSettings);
TShardedTableOptions opts;
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
- auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
Cout << "========= Send immediate write =========\n";
{
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
@@ -63,7 +69,16 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
Y_UNIT_TEST_QUAD(ExecSQLUpsertPrepared, EvWrite, Volatile) {
- auto [runtime, server, sender] = TestCreateServer();
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableOltpSink(EvWrite);
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(app);
+
+ auto [runtime, server, sender] = TestCreateServer(serverSettings);
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile);
@@ -71,9 +86,6 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", opts);
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {0, 1}}, {tableId2, {2, 3}}} : TEvWriteRows{};
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
-
Cout << "========= Send distributed write =========\n";
{
ExecSQL(server, sender, Q_(
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 6e19f17694b..8dff10e4301 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -2257,114 +2257,6 @@ NKikimrDataEvents::TEvWriteResult Update(TTestActorRuntime& runtime, TActorId se
return Write(runtime, sender, shardId, std::move(request), expectedStatus);
}
-TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
- if (rows.empty())
- return {};
-
- auto requestObserver = runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
- if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
- return;
-
- const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record;
-
- if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA)
- return;
-
- // Parse original TEvProposeTransaction
- const ui64 txId = record.GetTxId();
- const TString& txBody = record.GetTxBody();
- NKikimrTxDataShard::TDataTransaction tx;
- Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size()));
-
- // Construct new EvWrite
- TVector<TCell> cells;
- TTableId tableId;
- ui16 colCount = 0;
- for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
- NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
- Y_VERIFY(task.GetMeta().UnpackTo(&meta));
- if (!meta.HasWrites())
- continue;
-
- const auto& tableMeta = meta.GetTable();
- TTableId tableIdProto(tableMeta.GetTableId().GetOwnerId(), tableMeta.GetTableId().GetTableId(), tableMeta.GetSchemaVersion());
- Y_VERIFY_S(tableId == TTableId{} || tableId == tableIdProto, "Only writes to one table is supported now");
- tableId = tableIdProto;
- const auto& writes = meta.GetWrites();
- Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now.");
- colCount = writes.GetColumns().size();
-
- const auto& row = rows.ProcessRow(tableId, txId);
- Y_VERIFY(row.Cells.size() == colCount);
- std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells));
- }
-
- Cerr << "TEvProposeTransaction " << txId << " is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl;
-
- auto txMode = NKikimr::NDataShard::NEvWrite::TConvertor::GetTxMode(record.GetFlags());
-
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
-
- if (!cells.empty()) {
- TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount);
- TString blobData = matrix.ReleaseBuffer();
-
- UNIT_ASSERT(blobData.size() < 8_MB);
-
- std::vector<ui32> columnIds(colCount);
- std::iota(columnIds.begin(), columnIds.end(), 1);
-
- ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
- evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
- }
-
- // Copy locks
- if (tx.HasLockTxId())
- evWrite->Record.SetLockTxId(tx.GetLockTxId());
- if (tx.HasLockNodeId())
- evWrite->Record.SetLockNodeId(tx.GetLockNodeId());
- if (tx.GetKqpTransaction().HasLocks())
- evWrite->Record.MutableLocks()->CopyFrom(tx.GetKqpTransaction().GetLocks());
-
- if (record.HasMvccSnapshot()) {
- *evWrite->Record.MutableMvccSnapshot() = record.GetMvccSnapshot();
- }
-
- // Replace event
- auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie);
- handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
- event.Reset(handle);
- });
-
- auto responseObserver = runtime.AddObserver([](TAutoPtr<IEventHandle>& event) {
- if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
- return;
-
- const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record;
- ui64 txId = record.GetTxId();
-
- Cerr << "EvWriteResult " << txId << " is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl;
-
- // Construct new EvProposeTransactionResult
- ui64 origin = record.GetOrigin();
- auto status = NKikimr::NDataShard::NEvWrite::TConvertor::GetStatus(record.GetStatus());
-
- auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status);
-
- if (status == NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED) {
- evResult->SetPrepared(record.GetMinStep(), record.GetMaxStep(), {});
- evResult->Record.MutableDomainCoordinators()->CopyFrom(record.GetDomainCoordinators());
- }
-
- // Replace event
- auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie);
- handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
- event.Reset(handle);
- });
-
- return {std::move(requestObserver), std::move(responseObserver)};
-}
-
NKikimrDataEvents::TEvWriteResult WaitForWriteCompleted(TTestActorRuntime& runtime, TActorId sender, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
{
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index 2c42040fdca..da08a456312 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -836,8 +836,6 @@ class TEvWriteRows : public std::vector<TEvWriteRow> {
}
};
-TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
-
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
struct TSendProposeToCoordinatorOptions {