aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-01-12 17:47:37 +0300
committerGitHub <noreply@github.com>2024-01-12 17:47:37 +0300
commitcefc335228a97bb22d5a346b45c634a98d929a29 (patch)
treea80ab5383622b8c495b0e233ab5aaea842c5e9a0
parent37a3110d33e4eb25675f3c1b81f0ab72719b556b (diff)
downloadydb-cefc335228a97bb22d5a346b45c634a98d929a29.tar.gz
Copy locks in observer & SingleObserver (#973)
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp3
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp21
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h3
-rw-r--r--ydb/library/actors/testlib/test_runtime.h1
5 files changed, 20 insertions, 13 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 17c07969f6..3cca28881e 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -4058,7 +4058,7 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
}
}
-Y_UNIT_TEST(TestSnapshotReadAfterBrokenLock) {
+Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
@@ -4073,6 +4073,9 @@ Y_UNIT_TEST(TestSnapshotReadAfterBrokenLock) {
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ auto rows = EvWrite ? TEvWriteRows{{{1, 1}}, {{2, 2}}, {{3, 3}}, {{5, 5}}} : TEvWriteRows{};
+ auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)"));
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index 870295488a..b2a6a33c87 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -36,8 +36,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{};
- auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
- auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows);
+ auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);"));
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 3b9253077e..f4e34337ed 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -1877,11 +1877,11 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
-TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
+TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
if (rows.empty())
return {};
- return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
+ auto requestObserver = runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
return;
@@ -1939,18 +1939,21 @@ TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWri
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+ // Copy locks
+ if (tx.HasLockTxId())
+ evWrite->Record.SetLockTxId(tx.GetLockTxId());
+ if (tx.HasLockNodeId())
+ evWrite->Record.SetLockNodeId(tx.GetLockNodeId());
+ if (tx.GetKqpTransaction().HasLocks())
+ evWrite->Record.MutableLocks()->CopyFrom(tx.GetKqpTransaction().GetLocks());
+
// Replace event
auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie);
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
event.Reset(handle);
});
-}
-
-TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
- if (rows.empty())
- return {};
- return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
+ auto responseObserver = runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
return;
@@ -1971,6 +1974,8 @@ TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWit
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
event.Reset(handle);
});
+
+ return {std::move(requestObserver), std::move(responseObserver)};
}
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index 6c0831e0db..752631f4e7 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -749,8 +749,7 @@ class TEvWriteRows : public std::vector<TEvWriteRow> {
}
};
-TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
-TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
+TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h
index 4fc308812d..bc83438630 100644
--- a/ydb/library/actors/testlib/test_runtime.h
+++ b/ydb/library/actors/testlib/test_runtime.h
@@ -349,6 +349,7 @@ namespace NActors {
TEventObserverCollection* List;
TEventObserverCollection::iterator Iter;
};
+ using TEventObserverHolderPair = std::pair<TEventObserverHolder,TEventObserverHolder>;
// An example of using AddObserver in unit tests
/*