diff options
| -rw-r--r-- | ydb/core/tx/datashard/datashard_repl_apply.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_replication.cpp | 42 |
2 files changed, 49 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 58779eeef4c..db15e9595db 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -1,4 +1,6 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" +#include "setup_sys_locks.h" #include <util/string/escape.h> @@ -24,6 +26,9 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { Y_UNUSED(ctx); + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guardLocks(*Self, &locksDb); + if (Self->State != TShardState::Ready) { Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, @@ -84,6 +89,7 @@ public: } if (MvccReadWriteVersion) { + Self->PromoteImmediatePostExecuteEdges(*MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); Pipeline.AddCommittingOp(*MvccReadWriteVersion); } @@ -92,6 +98,7 @@ public: NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK); } + Self->SysLocksTable().ApplyLocks(); return true; } diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index 30267d6537b..8d5df4025d1 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -1,11 +1,13 @@ #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> #include "datashard_active_transaction.h" +#include "datashard_ut_common_kqp.h" #include <ydb/core/tx/tx_proxy/proxy.h> namespace NKikimr { using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; @@ -307,6 +309,46 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { }, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED); } + Y_UNIT_TEST(ApplyChangesWithConcurrentTx) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Weak) + ); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 }, + }); + + TString sessionId; + TString txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 1, .WriteTxId = 0, .Key = 1, .Value = 21 }, + }); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + } + } } // namespace NKikimr |
