aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-30 11:48:12 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-30 11:48:12 +0300
commitfd7cbab1745211b326c9e51338a5a98db935c4ab (patch)
tree735d3ec20f769502c8d1193da43138d2858c8ce4
parenta2a90a1a4b9bc941c2398b0ff31cbac85619ec2b (diff)
downloadydb-fd7cbab1745211b326c9e51338a5a98db935c4ab.tar.gz
Faster lock removal in datashard, KIKIMR-14732
ref:a6b3f9fad1e1aca817c5603fe8b1c7bfe8b87952
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/common/kqp_transform.h5
-rw-r--r--ydb/core/kqp/executer/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp10
-rw-r--r--ydb/core/kqp/executer/kqp_executer.h11
-rw-r--r--ydb/core/kqp/host/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/host/kqp_run_data.cpp6
-rw-r--r--ydb/core/kqp/host/kqp_run_physical.cpp2
-rw-r--r--ydb/core/kqp/host/kqp_run_physical.h4
-rw-r--r--ydb/core/kqp/host/kqp_run_scan.cpp3
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp1
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp4
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt3
-rw-r--r--ydb/core/tx/datashard/datashard.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h3
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h8
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h8
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h3
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp58
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h71
-rw-r--r--ydb/core/tx/datashard/datashard_ut_locks.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp22
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/operation.h1
-rw-r--r--ydb/core/tx/datashard/remove_locks.cpp68
-rw-r--r--ydb/core/tx/datashard/setup_sys_locks.h1
-rw-r--r--ydb/core/tx/long_tx_service/public/lock_handle.h4
39 files changed, 301 insertions, 73 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 040aabe401..6ffe5b7ebd 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -543,6 +543,7 @@ add_subdirectory(ydb/core/kqp/runtime)
add_subdirectory(ydb/core/kqp/common)
add_subdirectory(ydb/core/kqp/expr_nodes)
add_subdirectory(ydb/library/yql/dq/expr_nodes)
+add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/library/yql/dq/actors)
add_subdirectory(ydb/library/yql/dq/common)
add_subdirectory(ydb/core/ydb_convert)
@@ -583,7 +584,6 @@ add_subdirectory(ydb/public/lib/value)
add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/services/lib/sharding)
-add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 71aa93dc49..fe4661448d 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -623,6 +623,7 @@ add_subdirectory(ydb/core/kqp/runtime)
add_subdirectory(ydb/core/kqp/common)
add_subdirectory(ydb/core/kqp/expr_nodes)
add_subdirectory(ydb/library/yql/dq/expr_nodes)
+add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/library/yql/dq/actors)
add_subdirectory(ydb/library/yql/dq/common)
add_subdirectory(ydb/core/ydb_convert)
@@ -663,7 +664,6 @@ add_subdirectory(ydb/public/lib/value)
add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/services/lib/sharding)
-add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt
index 460eefd0f9..bda9e7d47c 100644
--- a/ydb/core/kqp/common/CMakeLists.txt
+++ b/ydb/core/kqp/common/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-common PUBLIC
ydb-core-engine
core-kqp-expr_nodes
core-kqp-provider
+ tx-long_tx_service-public
ydb-library-aclib
yql-core-issue
yql-dq-actors
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index d15793fd1c..7c826e2afc 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -6,6 +6,7 @@
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/common/dq_value.h>
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <library/cpp/actors/core/actorid.h>
@@ -128,6 +129,7 @@ public:
struct TExecPhysicalResult : public TGenericResult {
NKikimrKqp::TExecuterTxResult ExecuterResult;
+ NLongTxService::TLockHandle LockHandle;
};
struct TAstQuerySettings {
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index 8d68686b47..3e60200e46 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -7,6 +7,8 @@
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
+
#include <ydb/library/yql/dq/common/dq_value.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -59,13 +61,14 @@ struct TKqpTxLocks {
NKikimrMiniKQL::TType LockType;
NKikimrMiniKQL::TListType LocksListType;
THashMap<TKqpTxLock::TKey, TKqpTxLock> LocksMap;
+ NLongTxService::TLockHandle LockHandle;
TMaybe<NYql::TIssue> LockIssue;
bool HasLocks() const { return !LocksMap.empty(); }
bool Broken() const { return LockIssue.Defined(); }
void MarkBroken(NYql::TIssue lockIssue) { LockIssue.ConstructInPlace(std::move(lockIssue)); }
- ui64 GetLockTxId() const { return HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; }
+ ui64 GetLockTxId() const { return LockHandle ? LockHandle.GetLockId() : HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; }
size_t Size() const { return LocksMap.size(); }
void ReportIssues(NYql::TExprContext& ctx) {
diff --git a/ydb/core/kqp/executer/CMakeLists.txt b/ydb/core/kqp/executer/CMakeLists.txt
index 0dcc0d1c5b..1147ca042e 100644
--- a/ydb/core/kqp/executer/CMakeLists.txt
+++ b/ydb/core/kqp/executer/CMakeLists.txt
@@ -23,6 +23,7 @@ target_link_libraries(core-kqp-executer PUBLIC
core-kqp-compile
core-kqp-rm
ydb-core-protos
+ tx-long_tx_service-public
ydb-core-ydb_convert
ydb-library-mkql_proto
library-mkql_proto-protos
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 86b1943104..da92e4df2e 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -19,6 +19,7 @@
#include <ydb/core/tx/coordinator/coordinator_impl.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
@@ -30,6 +31,7 @@ namespace NKqp {
using namespace NYql;
using namespace NYql::NDq;
+using namespace NLongTxService;
namespace {
@@ -1182,6 +1184,7 @@ private:
if (lockTxId) {
dataTransaction.SetLockTxId(*lockTxId);
+ dataTransaction.SetLockNodeId(SelfId().NodeId());
}
for (auto& task : dataTransaction.GetKqpTransaction().GetTasks()) {
@@ -1616,6 +1619,7 @@ private:
auto lockTxId = Request.AcquireLocksTxId;
if (lockTxId.Defined() && *lockTxId == 0) {
lockTxId = TxId;
+ LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
}
// first, start compute tasks
@@ -1707,6 +1711,9 @@ private:
}
if (!Locks.empty()) {
+ if (LockHandle) {
+ ResponseEv->LockHandle = std::move(LockHandle);
+ }
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
}
@@ -1849,6 +1856,9 @@ private:
// Temporary storage during snapshot acquisition
TVector<NDqProto::TDqTask> ComputeTasks;
THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs;
+
+ // Lock handle for a newly acquired lock
+ TLockHandle LockHandle;
};
} // namespace
diff --git a/ydb/core/kqp/executer/kqp_executer.h b/ydb/core/kqp/executer/kqp_executer.h
index 13a0c10e5b..11ed832027 100644
--- a/ydb/core/kqp/executer/kqp_executer.h
+++ b/ydb/core/kqp/executer/kqp_executer.h
@@ -3,6 +3,7 @@
#include <ydb/core/kqp/common/kqp_common.h>
#include <ydb/core/kqp/common/kqp_gateway.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/kqp.pb.h>
@@ -14,7 +15,15 @@ struct TEvKqpExecuter {
TKqpExecuterEvents::EvTxRequest> {};
struct TEvTxResponse : public TEventPB<TEvTxResponse, NKikimrKqp::TEvExecuterTxResponse,
- TKqpExecuterEvents::EvTxResponse> {};
+ TKqpExecuterEvents::EvTxResponse>
+ {
+ NLongTxService::TLockHandle LockHandle;
+
+ bool IsSerializable() const override {
+ // We cannot serialize LockHandle, should always send locally
+ return false;
+ }
+ };
struct TEvStreamData : public TEventPB<TEvStreamData, NKikimrKqp::TEvExecuterStreamData,
TKqpExecuterEvents::EvStreamData> {};
diff --git a/ydb/core/kqp/host/CMakeLists.txt b/ydb/core/kqp/host/CMakeLists.txt
index a22010a8ea..5324cac490 100644
--- a/ydb/core/kqp/host/CMakeLists.txt
+++ b/ydb/core/kqp/host/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-host PUBLIC
core-kqp-opt
core-kqp-prepare
core-kqp-provider
+ tx-long_tx_service-public
yql-core-services
yql-minikql-invoke_builtins
library-yql-sql
diff --git a/ydb/core/kqp/host/kqp_run_data.cpp b/ydb/core/kqp/host/kqp_run_data.cpp
index 65b57268e3..aaf3a9c6c3 100644
--- a/ydb/core/kqp/host/kqp_run_data.cpp
+++ b/ydb/core/kqp/host/kqp_run_data.cpp
@@ -102,7 +102,11 @@ protected:
return TStatus::Async;
}
- bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override {
+ bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NLongTxService::TLockHandle&& lockHandle, TExprContext& ctx, bool commit) override {
+ if (lockHandle) {
+ TxState->Tx().Locks.LockHandle = std::move(lockHandle);
+ }
+
if (execResult.HasLocks()) {
YQL_ENSURE(!commit);
diff --git a/ydb/core/kqp/host/kqp_run_physical.cpp b/ydb/core/kqp/host/kqp_run_physical.cpp
index a48fed6179..e4b8dfe13a 100644
--- a/ydb/core/kqp/host/kqp_run_physical.cpp
+++ b/ydb/core/kqp/host/kqp_run_physical.cpp
@@ -134,7 +134,7 @@ IGraphTransformer::TStatus TKqpExecutePhysicalTransformerBase::DoApplyAsyncChang
TransformState->TxResults.emplace_back(std::move(txResults));
}
- if (!OnExecuterResult(std::move(execResult), ctx, ExecuteFlags.HasFlags(TKqpExecuteFlag::Commit))) {
+ if (!OnExecuterResult(std::move(execResult), std::move(result.LockHandle), ctx, ExecuteFlags.HasFlags(TKqpExecuteFlag::Commit))) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/host/kqp_run_physical.h b/ydb/core/kqp/host/kqp_run_physical.h
index 941873955f..a5a7674ed7 100644
--- a/ydb/core/kqp/host/kqp_run_physical.h
+++ b/ydb/core/kqp/host/kqp_run_physical.h
@@ -3,6 +3,7 @@
#include <ydb/core/kqp/common/kqp_gateway.h>
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
#include <ydb/core/kqp/prepare/kqp_prepare.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
@@ -43,7 +44,8 @@ protected:
virtual TStatus DoExecute(std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit, NYql::TExprContext& ctx) = 0;
virtual TStatus DoRollback() = 0;
- virtual bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NYql::TExprContext& ctx, bool commit) = 0;
+ virtual bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult,
+ NLongTxService::TLockHandle&& lockHandle, NYql::TExprContext& ctx, bool commit) = 0;
protected:
TKqpParamsMap PrepareParameters(const NKqpProto::TKqpPhyTx& tx);
diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp
index 70b2a661a0..a2b1db9279 100644
--- a/ydb/core/kqp/host/kqp_run_scan.cpp
+++ b/ydb/core/kqp/host/kqp_run_scan.cpp
@@ -56,9 +56,10 @@ protected:
YQL_ENSURE(false, "Rollback in ScanQuery tx");
}
- bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override {
+ bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NLongTxService::TLockHandle&& lockHandle, TExprContext& ctx, bool commit) override {
Y_UNUSED(ctx);
Y_UNUSED(commit);
+ Y_UNUSED(lockHandle);
if (execResult.HasStats()) {
TransformCtx->QueryStats.AddExecutions()->Swap(execResult.MutableStats());
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 2e038d4bbf..bef32238ea 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -881,6 +881,7 @@ private:
}
result.ExecuterResult.Swap(response->MutableResult());
+ result.LockHandle = std::move(ev->Get()->LockHandle);
Promise.SetValue(std::move(result));
this->PassAway();
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index fe12058da3..884da540e4 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1072,6 +1072,10 @@ public:
auto& txResult = *response->MutableResult();
QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult));
+ if (ev->Get()->LockHandle) {
+ QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle);
+ }
+
if (!MergeLocksWithTxResult(txResult)) {
return;
}
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index d68c7d1387..bcc87adcb1 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -410,4 +410,5 @@ enum ETxTypes {
TXTYPE_APPLY_REPLICATION_CHANGES = 68 [(TxTypeOpts) = {Name: "TxApplyReplicationChanges"}];
TXTYPE_READ = 69 [(TxTypeOpts) = {Name: "TxRead"}];
TXTYPE_SPLIT_REPLICATION_SOURCE_OFFSETS = 70 [(TxTypeOpts) = {Name: "TxSplitReplicationSourceOffsets"}];
+ TXTYPE_REMOVE_LOCK = 71 [(TxTypeOpts) = {Name: "TxRemoveLock"}];
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 84601ccb72..21f62e903e 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -258,6 +258,9 @@ message TDataTransaction {
optional uint64 CancelDeadlineMs = 14; // Wallclock timestamp from datareq (not duration)
optional uint64 CancelAfterMs = 15; // Duration from tx start local to datashard
optional bool CollectStats = 16;
+
+ // Datashard will subscribe to lock status when node id is non-zero
+ optional uint32 LockNodeId = 17;
}
message TCreateVolatileSnapshot {
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 8d3ff703fb..f30b9af363 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -37,6 +37,7 @@ target_link_libraries(core-tx-datashard PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-tablet_flat
+ tx-long_tx_service-public
ydb-core-util
ydb-core-wrappers
ydb-core-ydb_convert
@@ -187,6 +188,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_ops.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_treap.cpp
@@ -281,6 +283,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-tablet_flat
+ tx-long_tx_service-public
ydb-core-util
ydb-core-wrappers
ydb-core-ydb_convert
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 9c7bc61c8e..2e64b818bc 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
+#include <ydb/core/tx/long_tx_service/public/events.h>
#include <library/cpp/monlib/service/pages/templates.h>
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 8c2955a68c..cadf3cbf6d 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -280,7 +280,7 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) {
///
class TDataShardEngineHost : public TEngineHost {
public:
- TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, TInstant now)
+ TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
: TEngineHost(db, counters,
TEngineHostSettings(self->TabletID(),
(self->State == TShardState::Readonly || self->State == TShardState::Frozen),
@@ -289,6 +289,7 @@ public:
, Self(self)
, DB(db)
, LockTxId(lockTxId)
+ , LockNodeId(lockNodeId)
, Now(now)
{}
@@ -357,7 +358,7 @@ public:
if (LockTxId) {
// Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs
if (key.RowOperation != TKeyDesc::ERowOperation::Read &&
- (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot))
+ (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot || !LockNodeId))
{
key.Status = TKeyDesc::EStatus::OperationNotSupported;
return false;
@@ -381,7 +382,7 @@ public:
return DataShardSysTable(tableId).SelectRow(row, columnIds, returnType, readTarget, holderFactory);
}
- Self->SysLocksTable().SetLock(tableId, row, LockTxId);
+ Self->SysLocksTable().SetLock(tableId, row, LockTxId, LockNodeId);
Self->SetTableAccessTime(tableId, Now);
return TEngineHost::SelectRow(tableId, row, columnIds, returnType, readTarget, holderFactory);
@@ -394,7 +395,7 @@ public:
{
Y_VERIFY(!TSysTables::IsSystemTable(tableId), "SelectRange no system table is not supported");
- Self->SysLocksTable().SetLock(tableId, range, LockTxId);
+ Self->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId);
Self->SetTableAccessTime(tableId, Now);
return TEngineHost::SelectRange(tableId, range, columnIds, skipNullKeys, returnType, readTarget,
@@ -550,6 +551,7 @@ private:
TDataShard* Self;
NTable::TDatabase& DB;
const ui64& LockTxId;
+ const ui32& LockNodeId;
bool IsImmediateTx = false;
bool IsRepeatableSnapshot = false;
TInstant Now;
@@ -564,9 +566,10 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
std::pair<ui64, ui64> stepTxId)
: StepTxId(stepTxId)
, LockTxId(0)
+ , LockNodeId(0)
{
auto now = TAppData::TimeProvider->Now();
- EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, now);
+ EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now);
EngineSettings = MakeHolder<TEngineFlatSettings>(IEngineFlat::EProtocol::V1, AppData(ctx)->FunctionRegistry,
*TAppData::RandomProvider, *TAppData::TimeProvider, EngineHost.Get(), self->AllocCounters);
@@ -741,10 +744,11 @@ IEngineFlat * TEngineBay::GetEngine() {
return Engine.Get();
}
-void TEngineBay::SetLockTxId(ui64 lockTxId) {
+void TEngineBay::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) {
LockTxId = lockTxId;
+ LockNodeId = lockNodeId;
if (ComputeCtx) {
- ComputeCtx->SetLockTxId(lockTxId);
+ ComputeCtx->SetLockTxId(lockTxId, lockNodeId);
}
}
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 2a1374cac1..b01f64aedb 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -44,7 +44,7 @@ public:
const NMiniKQL::IEngineFlat * GetEngine() const { return Engine.Get(); }
NMiniKQL::IEngineFlat * GetEngine();
- void SetLockTxId(ui64 lockTxId);
+ void SetLockTxId(ui64 lockTxId, ui32 lockNodeId);
void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; }
EResult Validate() {
@@ -110,6 +110,7 @@ private:
TValidationInfo Info;
TEngineHostCounters EngineHostCounters;
ui64 LockTxId;
+ ui32 LockNodeId;
NYql::NDq::TLogFunc KqpLogFunc;
THolder<NUdf::IApplyContext> KqpApplyCtx;
THolder<NMiniKQL::TKqpDatashardComputeContext> ComputeCtx;
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index a325090a9a..48882a072c 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -46,7 +46,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
"One of the fields should be set: MiniKQL, ReadTableTransaction, KqpTransaction");
if (Tx.GetLockTxId())
- EngineBay.SetLockTxId(Tx.GetLockTxId());
+ EngineBay.SetLockTxId(Tx.GetLockTxId(), Tx.GetLockNodeId());
if (Tx.GetImmediate())
EngineBay.SetIsImmediateTx();
@@ -286,8 +286,14 @@ bool TValidatedDataTx::CheckCancelled() {
void TValidatedDataTx::ReleaseTxData() {
TxBody = "";
auto lock = Tx.GetLockTxId();
+ auto lockNode = Tx.GetLockNodeId();
Tx.Clear();
- Tx.SetLockTxId(lock);
+ if (lock) {
+ Tx.SetLockTxId(lock);
+ }
+ if (lockNode) {
+ Tx.SetLockNodeId(lockNode);
+ }
EngineBay.DestroyEngine();
IsReleased = true;
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 2330acacb2..d206e7d0bf 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -138,6 +138,7 @@ public:
const TString& Body() const { return TxBody; }
ui64 LockTxId() const { return Tx.GetLockTxId(); }
+ ui32 LockNodeId() const { return Tx.GetLockNodeId(); }
ui64 ProgramSize() const { return Tx.GetMiniKQL().size(); }
bool Immediate() const { return Tx.GetImmediate(); }
bool ReadOnly() const { return Tx.GetReadOnly(); }
@@ -506,6 +507,13 @@ public:
return 0;
}
+ ui32 LockNodeId() const override
+ {
+ if (DataTx)
+ return DataTx->LockNodeId();
+ return 0;
+ }
+
bool HasLockedWrites() const override
{
if (DataTx)
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index acc6d2063d..5068c4c6f6 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -35,6 +35,7 @@
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tablet_flat/flat_page_iface.h>
+#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/protos/tx.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>
@@ -54,6 +55,7 @@ extern TStringBuf SnapshotTransferReadSetMagic;
using NTabletFlatExecutor::ITransaction;
using NTabletFlatExecutor::TScanOptions;
+using NLongTxService::TEvLongTxService;
// For CopyTable and MoveShadow
class TTxTableSnapshotContext : public NTabletFlatExecutor::TTableSnapshotContext {
@@ -205,6 +207,7 @@ class TDataShard
class TTxCompactBorrowed;
class TTxCompactTable;
class TTxPersistFullCompactionTs;
+ class TTxRemoveLock;
template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
@@ -1024,6 +1027,8 @@ class TDataShard
void Handle(TEvDataShard::TEvApplyReplicationChanges::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
+
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
void DoPeriodicTasks(const TActorContext &ctx);
@@ -1536,6 +1541,8 @@ public:
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx);
+ void SubscribeNewLocks(const TActorContext &ctx);
+
private:
///
class TLoanReturnTracker {
@@ -2392,6 +2399,7 @@ protected:
fFunc(TEvDataShard::EvGetReplicationSourceOffsets, HandleByReplicationSourceOffsetsServer);
fFunc(TEvDataShard::EvReplicationSourceOffsetsAck, HandleByReplicationSourceOffsetsServer);
fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer);
+ HFunc(TEvLongTxService::TEvLockStatus, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index ef92e632c5..a557d87a2b 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -145,21 +145,22 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable
}
void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const {
- Shard->SysLocksTable().SetLock(tableId, range, LockTxId);
+ Shard->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId);
Shard->SetTableAccessTime(tableId, Now);
}
void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const {
- Shard->SysLocksTable().SetLock(tableId, key, LockTxId);
+ Shard->SysLocksTable().SetLock(tableId, key, LockTxId, LockNodeId);
Shard->SetTableAccessTime(tableId, Now);
}
void TKqpDatashardComputeContext::BreakSetLocks() const {
- Shard->SysLocksTable().BreakSetLocks(LockTxId);
+ Shard->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId);
}
-void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId) {
+void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) {
LockTxId = lockTxId;
+ LockNodeId = lockNodeId;
}
void TKqpDatashardComputeContext::SetReadVersion(TRowVersion readVersion) {
@@ -187,6 +188,7 @@ TActorId TKqpDatashardComputeContext::GetTaskOutputChannel(ui64 taskId, ui64 cha
void TKqpDatashardComputeContext::Clear() {
Database = nullptr;
LockTxId = 0;
+ LockNodeId = 0;
}
bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidatedKey>& keys, ui64 pageFaultCount) {
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index b6dccb27cf..a0e2072be5 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -31,7 +31,7 @@ public:
TString GetTablePath(const TTableId& tableId) const;
const NDataShard::TUserTable* GetTable(const TTableId& tableId) const;
void BreakSetLocks() const;
- void SetLockTxId(ui64 lockTxId);
+ void SetLockTxId(ui64 lockTxId, ui32 lockNodeId);
const NDataShard::TUserTable::TUserColumn& GetKeyColumnInfo(
const NDataShard::TUserTable& table, ui32 keyIndex) const;
@@ -108,6 +108,7 @@ private:
TEngineHost& EngineHost;
TInstant Now;
ui64 LockTxId = 0;
+ ui32 LockNodeId = 0;
bool PersistentChannels = false;
bool TabletNotReady = false;
TRowVersion ReadVersion = TRowVersion::Min();
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index ba5de5a9a6..2726a06aa6 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -10,9 +10,10 @@ namespace NDataShard {
// TLockInfo
-TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId)
+TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
: Locker(locker)
, LockId(lockId)
+ , LockNodeId(lockNodeId)
, Counter(locker->IncCounter())
, CreationTime(TAppData::TimeProvider->Now())
{}
@@ -108,8 +109,8 @@ void TTableLocks::BreakAllLocks(const TRowVersion& at) {
// TLockLocker
-TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockTxId);
+TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) {
+ TLockInfo::TPtr lock = GetOrAddLock(lockTxId, lockNodeId);
if (!lock || lock->IsBroken(at))
return lock;
@@ -121,8 +122,8 @@ TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, const THashSet<TPathId>
return lock;
}
-TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, const TPointKey& point, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockId);
+TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, ui32 lockNodeId, const TPointKey& point, const TRowVersion& at) {
+ TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId);
if (!lock || lock->IsBroken(at))
return lock;
@@ -132,8 +133,8 @@ TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, const TPointKey& point, c
return lock;
}
-TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, const TRangeKey& range, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockId);
+TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TRangeKey& range, const TRowVersion& at) {
+ TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId);
if (!lock || lock->IsBroken(at))
return lock;
@@ -223,16 +224,25 @@ void TLockLocker::RemoveBrokenRanges() {
}
}
-TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId) {
+TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
auto it = Locks.find(lockId);
if (it != Locks.end()) {
Limiter.TouchLock(lockId);
+ if (lockNodeId && !it->second->LockNodeId) {
+ // This shouldn't ever happen, but better safe than sorry
+ it->second->LockNodeId = lockNodeId;
+ PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
+ }
return it->second;
}
- TLockInfo::TPtr lock = Limiter.TryAddLock(lockId);
- if (lock)
+ TLockInfo::TPtr lock = Limiter.TryAddLock(lockId, lockNodeId);
+ if (lock) {
Locks[lockId] = lock;
+ if (lockNodeId) {
+ PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
+ }
+ }
return lock;
}
@@ -330,9 +340,13 @@ void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) {
}
}
+void TLockLocker::RemoveSubscribedLock(ui64 lockId) {
+ RemoveLock(lockId);
+}
+
// TLockLocker.TLockLimiter
-TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId) {
+TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId, ui32 lockNodeId) {
#if 1
if (LocksQueue.Size() >= LockLimit()) {
Parent->RemoveBrokenLocks();
@@ -352,7 +366,7 @@ TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId) {
}
LocksQueue.Insert(lockId, TAppData::TimeProvider->Now());
- return TLockInfo::TPtr(new TLockInfo(Parent, lockId));
+ return TLockInfo::TPtr(new TLockInfo(Parent, lockId, lockNodeId));
}
void TLockLocker::TLockLimiter::RemoveLock(ui64 lockId) {
@@ -414,7 +428,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
if (Update->BreakOwn) {
counter = TLock::ErrorAlreadyBroken;
} else if (Update->ShardLock) {
- TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->AffectedTables, checkVersion);
+ TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->LockNodeId, Update->AffectedTables, checkVersion);
if (lock) {
Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
counter = lock->GetCounter(checkVersion);
@@ -423,7 +437,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
}
} else {
for (const auto& key : Update->PointLocks) {
- TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, key, checkVersion);
+ TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, Update->LockNodeId, key, checkVersion);
if (lock) {
Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
counter = lock->GetCounter(checkVersion);
@@ -433,7 +447,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
}
for (const auto& key : Update->RangeLocks) {
- TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, key, checkVersion);
+ TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, Update->LockNodeId, key, checkVersion);
if (lock) {
Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
counter = lock->GetCounter(checkVersion);
@@ -523,20 +537,20 @@ void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) {
Update->EraseLock(GetLockId(key));
}
-void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId) {
+void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
return;
if (lockTxId) {
Y_VERIFY(Update);
- Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId);
+ Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId, lockNodeId);
}
}
-void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId) {
+void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId) {
if (range.Point) { // if range is point replace it with a point lock
- SetLock(tableId, range.From, lockTxId);
+ SetLock(tableId, range.From, lockTxId, lockNodeId);
return;
}
@@ -546,7 +560,7 @@ void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64
if (lockTxId) {
Y_VERIFY(Update);
- Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId);
+ Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId, lockNodeId);
}
}
@@ -570,11 +584,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) {
Update->BreakShardLock();
}
-void TSysLocks::BreakSetLocks(ui64 lockTxId) {
+void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) {
Y_VERIFY(Update);
if (lockTxId)
- Update->BreakSetLocks(lockTxId);
+ Update->BreakSetLocks(lockTxId, lockNodeId);
}
bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const {
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 19c68c4d88..e1389538f8 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -8,6 +8,7 @@
#include <ydb/core/tablet/tablet_counters.h>
#include <library/cpp/cache/cache.h>
+#include <util/generic/list.h>
#include <util/generic/queue.h>
#include <util/generic/set.h>
@@ -135,6 +136,22 @@ struct TVersionedLockId {
}
};
+struct TPendingSubscribeLock {
+ ui64 LockId = 0;
+ ui32 LockNodeId = 0;
+
+ TPendingSubscribeLock() = default;
+
+ TPendingSubscribeLock(ui64 lockId, ui32 lockNodeId)
+ : LockId(lockId)
+ , LockNodeId(lockNodeId)
+ { }
+
+ explicit operator bool() const {
+ return LockId != 0;
+ }
+};
+
/// Aggregates shard, point and range locks
class TLockInfo : public TSimpleRefCount<TLockInfo> {
friend class TTableLocks;
@@ -143,7 +160,7 @@ class TLockInfo : public TSimpleRefCount<TLockInfo> {
public:
using TPtr = TIntrusivePtr<TLockInfo>;
- TLockInfo(TLockLocker * locker, ui64 lockId);
+ TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId);
~TLockInfo();
ui64 GetCounter(const TRowVersion& at = TRowVersion::Max()) const { return !BreakVersion || at < *BreakVersion ? Counter : Max<ui64>(); }
@@ -157,6 +174,7 @@ public:
bool MayHavePointsAndRanges() const { return !ShardLock && (!BreakVersion || *BreakVersion); }
ui64 GetLockId() const { return LockId; }
+ ui32 GetLockNodeId() const { return LockNodeId; }
TInstant GetCreationTime() const { return CreationTime; }
const THashSet<TPathId>& GetAffectedTables() const { return AffectedTables; }
@@ -173,6 +191,7 @@ private:
private:
TLockLocker * Locker;
ui64 LockId;
+ ui32 LockNodeId;
ui64 Counter;
TInstant CreationTime;
THashSet<TPathId> AffectedTables;
@@ -256,7 +275,7 @@ public:
ui64 LocksCount() const { return Parent->LocksCount(); }
- TLockInfo::TPtr TryAddLock(ui64 lockId);
+ TLockInfo::TPtr TryAddLock(ui64 lockId, ui32 lockNodeId);
void RemoveLock(ui64 lockId);
void TouchLock(ui64 lockId);
@@ -280,9 +299,9 @@ public:
Tables.clear();
}
- TLockInfo::TPtr AddShardLock(ui64 lockTxId, const THashSet<TPathId>& affectedTables, const TRowVersion& at);
- TLockInfo::TPtr AddPointLock(ui64 lockTxId, const TPointKey& key, const TRowVersion& at);
- TLockInfo::TPtr AddRangeLock(ui64 lockTxId, const TRangeKey& key, const TRowVersion& at);
+ TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at);
+ TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at);
+ TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at);
TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const;
ui64 LocksCount() const { return Locks.size(); }
@@ -326,6 +345,17 @@ public:
// optimisation: set to remove broken lock at next Remove()
void ScheduleLockCleanup(ui64 lockId, const TRowVersion& at);
+ TPendingSubscribeLock NextPendingSubscribeLock() {
+ TPendingSubscribeLock result;
+ if (!PendingSubscribeLocks.empty()) {
+ result = PendingSubscribeLocks.front();
+ PendingSubscribeLocks.pop_front();
+ }
+ return result;
+ }
+
+ void RemoveSubscribedLock(ui64 lockId);
+
ui64 IncCounter() { return Counter++; };
private:
@@ -337,6 +367,7 @@ private:
TVector<ui64> CleanupPending; // LockIds of broken locks with pending cleanup
TPriorityQueue<TVersionedLockId> BrokenCandidates;
TPriorityQueue<TVersionedLockId> CleanupCandidates;
+ TList<TPendingSubscribeLock> PendingSubscribeLocks;
TLockLimiter Limiter;
ui64 Counter;
@@ -348,7 +379,7 @@ private:
void RemoveBrokenRanges();
- TLockInfo::TPtr GetOrAddLock(ui64 lockId);
+ TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId);
void RemoveOneLock(ui64 lockId);
void RemoveBrokenLocks();
};
@@ -356,6 +387,7 @@ private:
/// A portion of locks update
struct TLocksUpdate {
ui64 LockTxId = 0;
+ ui32 LockNodeId = 0;
TVector<TPointKey> PointLocks;
TVector<TRangeKey> RangeLocks;
TVector<TPointKey> PointBreaks;
@@ -373,6 +405,7 @@ struct TLocksUpdate {
void Clear() {
LockTxId = 0;
+ LockNodeId = 0;
ShardLock = false;
ShardBreak = false;
PointLocks.clear();
@@ -385,15 +418,15 @@ struct TLocksUpdate {
return ShardLock || PointLocks.size() || RangeLocks.size();
}
- void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId) {
- Y_VERIFY(LockTxId == lockId);
+ void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId, ui32 lockNodeId) {
+ Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
AffectedTables.insert(tableId.PathId);
RangeTables.insert(tableId.PathId);
RangeLocks.push_back(range);
}
- void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId) {
- Y_VERIFY(LockTxId == lockId);
+ void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId, ui32 lockNodeId) {
+ Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
AffectedTables.insert(tableId.PathId);
PointLocks.push_back(key);
}
@@ -414,8 +447,8 @@ struct TLocksUpdate {
Erases.push_back(lockId);
}
- void BreakSetLocks(ui64 lockId) {
- Y_VERIFY(LockTxId == lockId);
+ void BreakSetLocks(ui64 lockId, ui32 lockNodeId) {
+ Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
BreakOwn = true;
}
};
@@ -468,11 +501,11 @@ public:
ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const;
TLock GetLock(const TArrayRef<const TCell>& syslockKey) const;
void EraseLock(const TArrayRef<const TCell>& syslockKey);
- void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId);
- void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId);
+ void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
+ void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId);
void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
void BreakAllLocks(const TTableId& tableId);
- void BreakSetLocks(ui64 lockTxId);
+ void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId);
bool IsMyKey(const TArrayRef<const TCell>& key) const;
ui64 LocksCount() const { return Locker.LocksCount(); }
@@ -489,6 +522,14 @@ public:
return true;
}
+ TPendingSubscribeLock NextPendingSubscribeLock() {
+ return Locker.NextPendingSubscribeLock();
+ }
+
+ void RemoveSubscribedLock(ui64 lockId) {
+ Locker.RemoveSubscribedLock(lockId);
+ }
+
private:
THolder<TLocksDataShard> Self;
TLockLocker Locker;
diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp
index 1d25c39f99..a3c54efe86 100644
--- a/ydb/core/tx/datashard/datashard_ut_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp
@@ -159,12 +159,12 @@ namespace NTest {
template <typename T>
void SetLock(const TPointKey<T>& key) {
- Locks.SetLock(TableId, key.GetRow(), LockId());
+ Locks.SetLock(TableId, key.GetRow(), LockId(), 0);
}
template <typename T>
void SetLock(const TRangeKey<T>& range) {
- Locks.SetLock(TableId, range.GetRowsRange(), LockId());
+ Locks.SetLock(TableId, range.GetRowsRange(), LockId(), 0);
}
template <typename T>
@@ -173,7 +173,7 @@ namespace NTest {
}
void BreakSetLocks() {
- Locks.BreakSetLocks(LockId());
+ Locks.BreakSetLocks(LockId(), 0);
}
//
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index ab4092496a..be1a27b030 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1502,11 +1502,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
SimulateSleep(server, TDuration::Seconds(1));
- auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto execSimpleRequest = [&](const TString& query, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) -> TString {
auto reqSender = runtime.AllocateEdgeActor();
auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), expectedStatus);
if (response.GetResponse().GetResults().size() == 0) {
return "";
}
@@ -1554,8 +1554,10 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
};
ui64 lastLockTxId = 0;
+ ui32 lastLockNodeId = 0;
TRowVersion lastMvccSnapshot = TRowVersion::Min();
ui64 injectLockTxId = 0;
+ ui32 injectLockNodeId = 0;
TRowVersion injectMvccSnapshot = TRowVersion::Min();
auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
switch (ev->GetTypeRewrite()) {
@@ -1590,8 +1592,12 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
if (tx.GetLockTxId()) {
lastLockTxId = tx.GetLockTxId();
+ lastLockNodeId = tx.GetLockNodeId();
} else if (injectLockTxId) {
tx.SetLockTxId(injectLockTxId);
+ if (injectLockNodeId) {
+ tx.SetLockNodeId(injectLockNodeId);
+ }
TString txBody;
Y_VERIFY(tx.SerializeToString(&txBody));
record.SetTxBody(txBody);
@@ -1626,20 +1632,30 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
// We should have been acquiring locks
Y_VERIFY(lastLockTxId != 0);
ui64 snapshotLockTxId = lastLockTxId;
+ ui32 snapshotLockNodeId = lastLockNodeId;
Y_VERIFY(lastMvccSnapshot);
auto snapshotVersion = lastMvccSnapshot;
// Perform an immediate write, pretending it happens as part of the above snapshot tx
injectLockTxId = snapshotLockTxId;
+ injectLockNodeId = snapshotLockNodeId;
injectMvccSnapshot = snapshotVersion;
UNIT_ASSERT_VALUES_EQUAL(
execSimpleRequest(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)
- )")),
+ )"),
+ UseNewEngine ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::UNAVAILABLE),
"");
injectLockTxId = 0;
+ injectLockNodeId = 0;
injectMvccSnapshot = TRowVersion::Min();
+ // Old engine doesn't support LockNodeId
+ // There's nothing to test unless we can write uncommitted data
+ if (!UseNewEngine) {
+ return;
+ }
+
// Start another snapshot read, it should not see above write (it's uncommitted)
TString sessionId2, txId2;
UNIT_ASSERT_VALUES_EQUAL(
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index d477ca5514..d237bbdbcb 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -41,6 +41,7 @@ public:
op->ChangeRecords() = std::move(tx->GetCollectedChanges());
DataShard.SysLocksTable().ApplyLocks();
+ DataShard.SubscribeNewLocks(ctx);
Pipeline.AddCommittingOp(op);
return EExecutionStatus::DelayCompleteNoMoreRestarts;
diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
index 5a09528adc..0ae61b8af8 100644
--- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
@@ -23,7 +23,7 @@ public:
return !op->HasRuntimeConflicts();
}
- EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override {
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
Y_VERIFY(op->IsCommitWritesTx());
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
@@ -46,6 +46,7 @@ public:
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
DataShard.SysLocksTable().ApplyLocks();
+ DataShard.SubscribeNewLocks(ctx);
Pipeline.AddCommittingOp(op);
return EExecutionStatus::ExecutedNoMoreRestarts;
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 9f6c59189b..b6ac426d25 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -24,7 +24,7 @@ public:
private:
void ExecuteDataTx(TOperation::TPtr op,
const TActorContext& ctx);
- void AddLocksToResult(TOperation::TPtr op);
+ void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx);
};
TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard,
@@ -254,15 +254,15 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
}
if (counters.InvisibleRowSkips) {
- DataShard.SysLocksTable().BreakSetLocks(op->LockTxId());
+ DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId());
}
- AddLocksToResult(op);
+ AddLocksToResult(op, ctx);
Pipeline.AddCommittingOp(op);
}
-void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) {
+void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) {
auto locks = DataShard.SysLocksTable().ApplyLocks();
for (const auto& lock : locks) {
if (lock.IsError()) {
@@ -273,6 +273,7 @@ void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) {
op->Result()->AddTxLock(lock.LockId, lock.DataShard, lock.Generation, lock.Counter,
lock.SchemeShard, lock.PathId);
}
+ DataShard.SubscribeNewLocks(ctx);
}
void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index f16db8eaa3..bc6fda112e 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -29,7 +29,7 @@ public:
return !op->HasRuntimeConflicts();
}
- EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override {
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
Y_VERIFY(op->IsDistributedEraseTx());
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
@@ -82,6 +82,7 @@ public:
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
DataShard.SysLocksTable().ApplyLocks();
+ DataShard.SubscribeNewLocks(ctx);
Pipeline.AddCommittingOp(op);
return EExecutionStatus::ExecutedNoMoreRestarts;
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index d1e02434d0..b60562af1a 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -137,6 +137,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
KqpRollbackLockChanges(tabletId, tx, DataShard, txc);
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
DataShard.SysLocksTable().ApplyLocks();
+ DataShard.SubscribeNewLocks(ctx);
return EExecutionStatus::Executed;
}
@@ -181,7 +182,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
if (dataTx->GetCounters().InvisibleRowSkips) {
- DataShard.SysLocksTable().BreakSetLocks(op->LockTxId());
+ DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId());
}
AddLocksToResult(op, ctx);
@@ -242,6 +243,7 @@ void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorCo
LOG_T("add lock to result: " << op->Result()->Record.GetTxLocks().rbegin()->ShortDebugString());
}
+ DataShard.SubscribeNewLocks(ctx);
}
EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx,
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 3d330806c7..167f265f6a 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -672,6 +672,7 @@ public:
return GetKeysInfo().ReadsCount + GetKeysInfo().WritesCount;
}
virtual ui64 LockTxId() const { return 0; }
+ virtual ui32 LockNodeId() const { return 0; }
virtual bool HasLockedWrites() const { return false; }
////////////////////////////////////////
diff --git a/ydb/core/tx/datashard/remove_locks.cpp b/ydb/core/tx/datashard/remove_locks.cpp
new file mode 100644
index 0000000000..f05ab6d7b8
--- /dev/null
+++ b/ydb/core/tx/datashard/remove_locks.cpp
@@ -0,0 +1,68 @@
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+using namespace NLongTxService;
+
+class TDataShard::TTxRemoveLock
+ : public NTabletFlatExecutor::TTransactionBase<TDataShard>
+{
+public:
+ TTxRemoveLock(TDataShard* self, ui64 lockId)
+ : TBase(self)
+ , LockId(lockId)
+ { }
+
+ TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ // Remove any uncommitted changes with this lock id
+ // FIXME: if a distributed tx has already validated (and persisted)
+ // its locks, we must preserve uncommitted changes even when lock is
+ // removed on the originating node, since the final outcome may
+ // actually decide to commit.
+ for (const auto& pr : Self->GetUserTables()) {
+ auto localTid = pr.second->LocalTid;
+ if (txc.DB.HasOpenTx(localTid, LockId)) {
+ txc.DB.RemoveTx(localTid, LockId);
+ }
+ }
+
+ // Remove the lock from memory, it's no longer needed
+ Self->SysLocks.RemoveSubscribedLock(LockId);
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+
+private:
+ const ui64 LockId;
+};
+
+void TDataShard::Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx) {
+ auto* msg = ev->Get();
+ const ui64 lockId = msg->Record.GetLockId();
+ switch (msg->Record.GetStatus()) {
+ case NKikimrLongTxService::TEvLockStatus::STATUS_NOT_FOUND:
+ case NKikimrLongTxService::TEvLockStatus::STATUS_UNAVAILABLE:
+ Execute(new TTxRemoveLock(this, lockId), ctx);
+ break;
+
+ default:
+ break;
+ }
+}
+
+void TDataShard::SubscribeNewLocks(const TActorContext&) {
+ while (auto pendingSubscribeLock = SysLocks.NextPendingSubscribeLock()) {
+ Send(MakeLongTxServiceID(SelfId().NodeId()),
+ new TEvLongTxService::TEvSubscribeLock(
+ pendingSubscribeLock.LockId,
+ pendingSubscribeLock.LockNodeId));
+ }
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h
index 1774533dba..026ba14f41 100644
--- a/ydb/core/tx/datashard/setup_sys_locks.h
+++ b/ydb/core/tx/datashard/setup_sys_locks.h
@@ -17,6 +17,7 @@ struct TSetupSysLocks {
update.Clear();
update.LockTxId = op->LockTxId();
+ update.LockNodeId = op->LockNodeId();
if (self.IsMvccEnabled()) {
auto [readVersion, writeVersion] = self.GetReadWriteVersions(op.Get());
diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.h b/ydb/core/tx/long_tx_service/public/lock_handle.h
index fa7647d12a..84ee458871 100644
--- a/ydb/core/tx/long_tx_service/public/lock_handle.h
+++ b/ydb/core/tx/long_tx_service/public/lock_handle.h
@@ -56,6 +56,10 @@ namespace NLongTxService {
return *this;
}
+ explicit operator bool() const noexcept {
+ return bool(LockId);
+ }
+
ui64 GetLockId() const noexcept {
return LockId;
}