summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/datashard/datashard_repl_apply.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_replication.cpp42
2 files changed, 49 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp
index 58779eeef4c..db15e9595db 100644
--- a/ydb/core/tx/datashard/datashard_repl_apply.cpp
+++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp
@@ -1,4 +1,6 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
+#include "setup_sys_locks.h"
#include <util/string/escape.h>
@@ -24,6 +26,9 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guardLocks(*Self, &locksDb);
+
if (Self->State != TShardState::Ready) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
@@ -84,6 +89,7 @@ public:
}
if (MvccReadWriteVersion) {
+ Self->PromoteImmediatePostExecuteEdges(*MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
Pipeline.AddCommittingOp(*MvccReadWriteVersion);
}
@@ -92,6 +98,7 @@ public:
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
}
+ Self->SysLocksTable().ApplyLocks();
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp
index 30267d6537b..8d5df4025d1 100644
--- a/ydb/core/tx/datashard/datashard_ut_replication.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp
@@ -1,11 +1,13 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "datashard_active_transaction.h"
+#include "datashard_ut_common_kqp.h"
#include <ydb/core/tx/tx_proxy/proxy.h>
namespace NKikimr {
using namespace NKikimr::NDataShard;
+using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;
@@ -307,6 +309,46 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
}, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED);
}
+ Y_UNIT_TEST(ApplyChangesWithConcurrentTx) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Weak)
+ );
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ ApplyChanges(server, shards.at(0), tableId, "my-source", {
+ TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 },
+ });
+
+ TString sessionId;
+ TString txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"),
+ "{ items { uint32_value: 1 } items { uint32_value: 11 } }");
+
+ ApplyChanges(server, shards.at(0), tableId, "my-source", {
+ TChange{ .Offset = 1, .WriteTxId = 0, .Key = 1, .Value = 21 },
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, "SELECT key, value FROM `/Root/table-1`;"),
+ "{ items { uint32_value: 1 } items { uint32_value: 11 } }");
+ }
+
}
} // namespace NKikimr