diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-30 11:48:12 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-30 11:48:12 +0300 |
commit | fd7cbab1745211b326c9e51338a5a98db935c4ab (patch) | |
tree | 735d3ec20f769502c8d1193da43138d2858c8ce4 | |
parent | a2a90a1a4b9bc941c2398b0ff31cbac85619ec2b (diff) | |
download | ydb-fd7cbab1745211b326c9e51338a5a98db935c4ab.tar.gz |
Faster lock removal in datashard, KIKIMR-14732
ref:a6b3f9fad1e1aca817c5603fe8b1c7bfe8b87952
39 files changed, 301 insertions, 73 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 040aabe401..6ffe5b7ebd 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -543,6 +543,7 @@ add_subdirectory(ydb/core/kqp/runtime) add_subdirectory(ydb/core/kqp/common) add_subdirectory(ydb/core/kqp/expr_nodes) add_subdirectory(ydb/library/yql/dq/expr_nodes) +add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/library/yql/dq/actors) add_subdirectory(ydb/library/yql/dq/common) add_subdirectory(ydb/core/ydb_convert) @@ -583,7 +584,6 @@ add_subdirectory(ydb/public/lib/value) add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/services/lib/sharding) -add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 71aa93dc49..fe4661448d 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -623,6 +623,7 @@ add_subdirectory(ydb/core/kqp/runtime) add_subdirectory(ydb/core/kqp/common) add_subdirectory(ydb/core/kqp/expr_nodes) add_subdirectory(ydb/library/yql/dq/expr_nodes) +add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/library/yql/dq/actors) add_subdirectory(ydb/library/yql/dq/common) add_subdirectory(ydb/core/ydb_convert) @@ -663,7 +664,6 @@ add_subdirectory(ydb/public/lib/value) add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/services/lib/sharding) -add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt index 460eefd0f9..bda9e7d47c 100644 --- a/ydb/core/kqp/common/CMakeLists.txt +++ b/ydb/core/kqp/common/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-common PUBLIC ydb-core-engine core-kqp-expr_nodes core-kqp-provider + tx-long_tx_service-public ydb-library-aclib yql-core-issue yql-dq-actors diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index d15793fd1c..7c826e2afc 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -6,6 +6,7 @@ #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/common/dq_value.h> #include <ydb/core/kqp/provider/yql_kikimr_gateway.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <library/cpp/actors/core/actorid.h> @@ -128,6 +129,7 @@ public: struct TExecPhysicalResult : public TGenericResult { NKikimrKqp::TExecuterTxResult ExecuterResult; + NLongTxService::TLockHandle LockHandle; }; struct TAstQuerySettings { diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index 8d68686b47..3e60200e46 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -7,6 +7,8 @@ #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> + #include <ydb/library/yql/dq/common/dq_value.h> #include <ydb/library/yql/utils/log/log.h> @@ -59,13 +61,14 @@ struct TKqpTxLocks { NKikimrMiniKQL::TType LockType; NKikimrMiniKQL::TListType LocksListType; THashMap<TKqpTxLock::TKey, TKqpTxLock> LocksMap; + NLongTxService::TLockHandle LockHandle; TMaybe<NYql::TIssue> LockIssue; bool HasLocks() const { return !LocksMap.empty(); } bool Broken() const { return LockIssue.Defined(); } void MarkBroken(NYql::TIssue lockIssue) { LockIssue.ConstructInPlace(std::move(lockIssue)); } - ui64 GetLockTxId() const { return HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; } + ui64 GetLockTxId() const { return LockHandle ? LockHandle.GetLockId() : HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; } size_t Size() const { return LocksMap.size(); } void ReportIssues(NYql::TExprContext& ctx) { diff --git a/ydb/core/kqp/executer/CMakeLists.txt b/ydb/core/kqp/executer/CMakeLists.txt index 0dcc0d1c5b..1147ca042e 100644 --- a/ydb/core/kqp/executer/CMakeLists.txt +++ b/ydb/core/kqp/executer/CMakeLists.txt @@ -23,6 +23,7 @@ target_link_libraries(core-kqp-executer PUBLIC core-kqp-compile core-kqp-rm ydb-core-protos + tx-long_tx_service-public ydb-core-ydb_convert ydb-library-mkql_proto library-mkql_proto-protos diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 86b1943104..da92e4df2e 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -19,6 +19,7 @@ #include <ydb/core/tx/coordinator/coordinator_impl.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/long_tx_service/public/events.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/yql/dq/runtime/dq_columns_resolve.h> @@ -30,6 +31,7 @@ namespace NKqp { using namespace NYql; using namespace NYql::NDq; +using namespace NLongTxService; namespace { @@ -1182,6 +1184,7 @@ private: if (lockTxId) { dataTransaction.SetLockTxId(*lockTxId); + dataTransaction.SetLockNodeId(SelfId().NodeId()); } for (auto& task : dataTransaction.GetKqpTransaction().GetTasks()) { @@ -1616,6 +1619,7 @@ private: auto lockTxId = Request.AcquireLocksTxId; if (lockTxId.Defined() && *lockTxId == 0) { lockTxId = TxId; + LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); } // first, start compute tasks @@ -1707,6 +1711,9 @@ private: } if (!Locks.empty()) { + if (LockHandle) { + ResponseEv->LockHandle = std::move(LockHandle); + } BuildLocks(*response.MutableResult()->MutableLocks(), Locks); } @@ -1849,6 +1856,9 @@ private: // Temporary storage during snapshot acquisition TVector<NDqProto::TDqTask> ComputeTasks; THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs; + + // Lock handle for a newly acquired lock + TLockHandle LockHandle; }; } // namespace diff --git a/ydb/core/kqp/executer/kqp_executer.h b/ydb/core/kqp/executer/kqp_executer.h index 13a0c10e5b..11ed832027 100644 --- a/ydb/core/kqp/executer/kqp_executer.h +++ b/ydb/core/kqp/executer/kqp_executer.h @@ -3,6 +3,7 @@ #include <ydb/core/kqp/common/kqp_common.h> #include <ydb/core/kqp/common/kqp_gateway.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/protos/config.pb.h> #include <ydb/core/protos/kqp.pb.h> @@ -14,7 +15,15 @@ struct TEvKqpExecuter { TKqpExecuterEvents::EvTxRequest> {}; struct TEvTxResponse : public TEventPB<TEvTxResponse, NKikimrKqp::TEvExecuterTxResponse, - TKqpExecuterEvents::EvTxResponse> {}; + TKqpExecuterEvents::EvTxResponse> + { + NLongTxService::TLockHandle LockHandle; + + bool IsSerializable() const override { + // We cannot serialize LockHandle, should always send locally + return false; + } + }; struct TEvStreamData : public TEventPB<TEvStreamData, NKikimrKqp::TEvExecuterStreamData, TKqpExecuterEvents::EvStreamData> {}; diff --git a/ydb/core/kqp/host/CMakeLists.txt b/ydb/core/kqp/host/CMakeLists.txt index a22010a8ea..5324cac490 100644 --- a/ydb/core/kqp/host/CMakeLists.txt +++ b/ydb/core/kqp/host/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-host PUBLIC core-kqp-opt core-kqp-prepare core-kqp-provider + tx-long_tx_service-public yql-core-services yql-minikql-invoke_builtins library-yql-sql diff --git a/ydb/core/kqp/host/kqp_run_data.cpp b/ydb/core/kqp/host/kqp_run_data.cpp index 65b57268e3..aaf3a9c6c3 100644 --- a/ydb/core/kqp/host/kqp_run_data.cpp +++ b/ydb/core/kqp/host/kqp_run_data.cpp @@ -102,7 +102,11 @@ protected: return TStatus::Async; } - bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override { + bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NLongTxService::TLockHandle&& lockHandle, TExprContext& ctx, bool commit) override { + if (lockHandle) { + TxState->Tx().Locks.LockHandle = std::move(lockHandle); + } + if (execResult.HasLocks()) { YQL_ENSURE(!commit); diff --git a/ydb/core/kqp/host/kqp_run_physical.cpp b/ydb/core/kqp/host/kqp_run_physical.cpp index a48fed6179..e4b8dfe13a 100644 --- a/ydb/core/kqp/host/kqp_run_physical.cpp +++ b/ydb/core/kqp/host/kqp_run_physical.cpp @@ -134,7 +134,7 @@ IGraphTransformer::TStatus TKqpExecutePhysicalTransformerBase::DoApplyAsyncChang TransformState->TxResults.emplace_back(std::move(txResults)); } - if (!OnExecuterResult(std::move(execResult), ctx, ExecuteFlags.HasFlags(TKqpExecuteFlag::Commit))) { + if (!OnExecuterResult(std::move(execResult), std::move(result.LockHandle), ctx, ExecuteFlags.HasFlags(TKqpExecuteFlag::Commit))) { return TStatus::Error; } diff --git a/ydb/core/kqp/host/kqp_run_physical.h b/ydb/core/kqp/host/kqp_run_physical.h index 941873955f..a5a7674ed7 100644 --- a/ydb/core/kqp/host/kqp_run_physical.h +++ b/ydb/core/kqp/host/kqp_run_physical.h @@ -3,6 +3,7 @@ #include <ydb/core/kqp/common/kqp_gateway.h> #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> #include <ydb/core/kqp/prepare/kqp_prepare.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/library/yql/core/yql_graph_transformer.h> @@ -43,7 +44,8 @@ protected: virtual TStatus DoExecute(std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit, NYql::TExprContext& ctx) = 0; virtual TStatus DoRollback() = 0; - virtual bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NYql::TExprContext& ctx, bool commit) = 0; + virtual bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, + NLongTxService::TLockHandle&& lockHandle, NYql::TExprContext& ctx, bool commit) = 0; protected: TKqpParamsMap PrepareParameters(const NKqpProto::TKqpPhyTx& tx); diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp index 70b2a661a0..a2b1db9279 100644 --- a/ydb/core/kqp/host/kqp_run_scan.cpp +++ b/ydb/core/kqp/host/kqp_run_scan.cpp @@ -56,9 +56,10 @@ protected: YQL_ENSURE(false, "Rollback in ScanQuery tx"); } - bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override { + bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NLongTxService::TLockHandle&& lockHandle, TExprContext& ctx, bool commit) override { Y_UNUSED(ctx); Y_UNUSED(commit); + Y_UNUSED(lockHandle); if (execResult.HasStats()) { TransformCtx->QueryStats.AddExecutions()->Swap(execResult.MutableStats()); diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 2e038d4bbf..bef32238ea 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -881,6 +881,7 @@ private: } result.ExecuterResult.Swap(response->MutableResult()); + result.LockHandle = std::move(ev->Get()->LockHandle); Promise.SetValue(std::move(result)); this->PassAway(); diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index fe12058da3..884da540e4 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1072,6 +1072,10 @@ public: auto& txResult = *response->MutableResult(); QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult)); + if (ev->Get()->LockHandle) { + QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle); + } + if (!MergeLocksWithTxResult(txResult)) { return; } diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index d68c7d1387..bcc87adcb1 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -410,4 +410,5 @@ enum ETxTypes { TXTYPE_APPLY_REPLICATION_CHANGES = 68 [(TxTypeOpts) = {Name: "TxApplyReplicationChanges"}]; TXTYPE_READ = 69 [(TxTypeOpts) = {Name: "TxRead"}]; TXTYPE_SPLIT_REPLICATION_SOURCE_OFFSETS = 70 [(TxTypeOpts) = {Name: "TxSplitReplicationSourceOffsets"}]; + TXTYPE_REMOVE_LOCK = 71 [(TxTypeOpts) = {Name: "TxRemoveLock"}]; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 84601ccb72..21f62e903e 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -258,6 +258,9 @@ message TDataTransaction { optional uint64 CancelDeadlineMs = 14; // Wallclock timestamp from datareq (not duration) optional uint64 CancelAfterMs = 15; // Duration from tx start local to datashard optional bool CollectStats = 16; + + // Datashard will subscribe to lock status when node id is non-zero + optional uint32 LockNodeId = 17; } message TCreateVolatileSnapshot { diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 8d3ff703fb..f30b9af363 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -37,6 +37,7 @@ target_link_libraries(core-tx-datashard PUBLIC ydb-core-protos ydb-core-tablet ydb-core-tablet_flat + tx-long_tx_service-public ydb-core-util ydb-core-wrappers ydb-core-ydb_convert @@ -187,6 +188,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_ops.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_treap.cpp @@ -281,6 +283,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC ydb-core-protos ydb-core-tablet ydb-core-tablet_flat + tx-long_tx_service-public ydb-core-util ydb-core-wrappers ydb-core-ydb_convert diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 9c7bc61c8e..2e64b818bc 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -5,6 +5,7 @@ #include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/scheme/scheme_tablecell.h> #include <ydb/core/tablet/tablet_counters_protobuf.h> +#include <ydb/core/tx/long_tx_service/public/events.h> #include <library/cpp/monlib/service/pages/templates.h> diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 8c2955a68c..cadf3cbf6d 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -280,7 +280,7 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) { /// class TDataShardEngineHost : public TEngineHost { public: - TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, TInstant now) + TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) : TEngineHost(db, counters, TEngineHostSettings(self->TabletID(), (self->State == TShardState::Readonly || self->State == TShardState::Frozen), @@ -289,6 +289,7 @@ public: , Self(self) , DB(db) , LockTxId(lockTxId) + , LockNodeId(lockNodeId) , Now(now) {} @@ -357,7 +358,7 @@ public: if (LockTxId) { // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs if (key.RowOperation != TKeyDesc::ERowOperation::Read && - (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot)) + (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot || !LockNodeId)) { key.Status = TKeyDesc::EStatus::OperationNotSupported; return false; @@ -381,7 +382,7 @@ public: return DataShardSysTable(tableId).SelectRow(row, columnIds, returnType, readTarget, holderFactory); } - Self->SysLocksTable().SetLock(tableId, row, LockTxId); + Self->SysLocksTable().SetLock(tableId, row, LockTxId, LockNodeId); Self->SetTableAccessTime(tableId, Now); return TEngineHost::SelectRow(tableId, row, columnIds, returnType, readTarget, holderFactory); @@ -394,7 +395,7 @@ public: { Y_VERIFY(!TSysTables::IsSystemTable(tableId), "SelectRange no system table is not supported"); - Self->SysLocksTable().SetLock(tableId, range, LockTxId); + Self->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId); Self->SetTableAccessTime(tableId, Now); return TEngineHost::SelectRange(tableId, range, columnIds, skipNullKeys, returnType, readTarget, @@ -550,6 +551,7 @@ private: TDataShard* Self; NTable::TDatabase& DB; const ui64& LockTxId; + const ui32& LockNodeId; bool IsImmediateTx = false; bool IsRepeatableSnapshot = false; TInstant Now; @@ -564,9 +566,10 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor std::pair<ui64, ui64> stepTxId) : StepTxId(stepTxId) , LockTxId(0) + , LockNodeId(0) { auto now = TAppData::TimeProvider->Now(); - EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, now); + EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now); EngineSettings = MakeHolder<TEngineFlatSettings>(IEngineFlat::EProtocol::V1, AppData(ctx)->FunctionRegistry, *TAppData::RandomProvider, *TAppData::TimeProvider, EngineHost.Get(), self->AllocCounters); @@ -741,10 +744,11 @@ IEngineFlat * TEngineBay::GetEngine() { return Engine.Get(); } -void TEngineBay::SetLockTxId(ui64 lockTxId) { +void TEngineBay::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { LockTxId = lockTxId; + LockNodeId = lockNodeId; if (ComputeCtx) { - ComputeCtx->SetLockTxId(lockTxId); + ComputeCtx->SetLockTxId(lockTxId, lockNodeId); } } diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 2a1374cac1..b01f64aedb 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -44,7 +44,7 @@ public: const NMiniKQL::IEngineFlat * GetEngine() const { return Engine.Get(); } NMiniKQL::IEngineFlat * GetEngine(); - void SetLockTxId(ui64 lockTxId); + void SetLockTxId(ui64 lockTxId, ui32 lockNodeId); void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; } EResult Validate() { @@ -110,6 +110,7 @@ private: TValidationInfo Info; TEngineHostCounters EngineHostCounters; ui64 LockTxId; + ui32 LockNodeId; NYql::NDq::TLogFunc KqpLogFunc; THolder<NUdf::IApplyContext> KqpApplyCtx; THolder<NMiniKQL::TKqpDatashardComputeContext> ComputeCtx; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index a325090a9a..48882a072c 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -46,7 +46,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, "One of the fields should be set: MiniKQL, ReadTableTransaction, KqpTransaction"); if (Tx.GetLockTxId()) - EngineBay.SetLockTxId(Tx.GetLockTxId()); + EngineBay.SetLockTxId(Tx.GetLockTxId(), Tx.GetLockNodeId()); if (Tx.GetImmediate()) EngineBay.SetIsImmediateTx(); @@ -286,8 +286,14 @@ bool TValidatedDataTx::CheckCancelled() { void TValidatedDataTx::ReleaseTxData() { TxBody = ""; auto lock = Tx.GetLockTxId(); + auto lockNode = Tx.GetLockNodeId(); Tx.Clear(); - Tx.SetLockTxId(lock); + if (lock) { + Tx.SetLockTxId(lock); + } + if (lockNode) { + Tx.SetLockNodeId(lockNode); + } EngineBay.DestroyEngine(); IsReleased = true; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 2330acacb2..d206e7d0bf 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -138,6 +138,7 @@ public: const TString& Body() const { return TxBody; } ui64 LockTxId() const { return Tx.GetLockTxId(); } + ui32 LockNodeId() const { return Tx.GetLockNodeId(); } ui64 ProgramSize() const { return Tx.GetMiniKQL().size(); } bool Immediate() const { return Tx.GetImmediate(); } bool ReadOnly() const { return Tx.GetReadOnly(); } @@ -506,6 +507,13 @@ public: return 0; } + ui32 LockNodeId() const override + { + if (DataTx) + return DataTx->LockNodeId(); + return 0; + } + bool HasLockedWrites() const override { if (DataTx) diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index acc6d2063d..5068c4c6f6 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -35,6 +35,7 @@ #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> #include <ydb/core/tablet_flat/flat_page_iface.h> +#include <ydb/core/tx/long_tx_service/public/events.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/protos/tx.pb.h> #include <ydb/core/protos/tx_datashard.pb.h> @@ -54,6 +55,7 @@ extern TStringBuf SnapshotTransferReadSetMagic; using NTabletFlatExecutor::ITransaction; using NTabletFlatExecutor::TScanOptions; +using NLongTxService::TEvLongTxService; // For CopyTable and MoveShadow class TTxTableSnapshotContext : public NTabletFlatExecutor::TTableSnapshotContext { @@ -205,6 +207,7 @@ class TDataShard class TTxCompactBorrowed; class TTxCompactTable; class TTxPersistFullCompactionTs; + class TTxRemoveLock; template <typename T> friend class TTxDirectBase; class TTxUploadRows; @@ -1024,6 +1027,8 @@ class TDataShard void Handle(TEvDataShard::TEvApplyReplicationChanges::TPtr& ev, const TActorContext& ctx); + void Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1536,6 +1541,8 @@ public: void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx); + void SubscribeNewLocks(const TActorContext &ctx); + private: /// class TLoanReturnTracker { @@ -2392,6 +2399,7 @@ protected: fFunc(TEvDataShard::EvGetReplicationSourceOffsets, HandleByReplicationSourceOffsetsServer); fFunc(TEvDataShard::EvReplicationSourceOffsetsAck, HandleByReplicationSourceOffsetsServer); fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer); + HFunc(TEvLongTxService::TEvLockStatus, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index ef92e632c5..a557d87a2b 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -145,21 +145,22 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable } void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const { - Shard->SysLocksTable().SetLock(tableId, range, LockTxId); + Shard->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId); Shard->SetTableAccessTime(tableId, Now); } void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const { - Shard->SysLocksTable().SetLock(tableId, key, LockTxId); + Shard->SysLocksTable().SetLock(tableId, key, LockTxId, LockNodeId); Shard->SetTableAccessTime(tableId, Now); } void TKqpDatashardComputeContext::BreakSetLocks() const { - Shard->SysLocksTable().BreakSetLocks(LockTxId); + Shard->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId); } -void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId) { +void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { LockTxId = lockTxId; + LockNodeId = lockNodeId; } void TKqpDatashardComputeContext::SetReadVersion(TRowVersion readVersion) { @@ -187,6 +188,7 @@ TActorId TKqpDatashardComputeContext::GetTaskOutputChannel(ui64 taskId, ui64 cha void TKqpDatashardComputeContext::Clear() { Database = nullptr; LockTxId = 0; + LockNodeId = 0; } bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidatedKey>& keys, ui64 pageFaultCount) { diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index b6dccb27cf..a0e2072be5 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -31,7 +31,7 @@ public: TString GetTablePath(const TTableId& tableId) const; const NDataShard::TUserTable* GetTable(const TTableId& tableId) const; void BreakSetLocks() const; - void SetLockTxId(ui64 lockTxId); + void SetLockTxId(ui64 lockTxId, ui32 lockNodeId); const NDataShard::TUserTable::TUserColumn& GetKeyColumnInfo( const NDataShard::TUserTable& table, ui32 keyIndex) const; @@ -108,6 +108,7 @@ private: TEngineHost& EngineHost; TInstant Now; ui64 LockTxId = 0; + ui32 LockNodeId = 0; bool PersistentChannels = false; bool TabletNotReady = false; TRowVersion ReadVersion = TRowVersion::Min(); diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index ba5de5a9a6..2726a06aa6 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -10,9 +10,10 @@ namespace NDataShard { // TLockInfo -TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId) +TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId) : Locker(locker) , LockId(lockId) + , LockNodeId(lockNodeId) , Counter(locker->IncCounter()) , CreationTime(TAppData::TimeProvider->Now()) {} @@ -108,8 +109,8 @@ void TTableLocks::BreakAllLocks(const TRowVersion& at) { // TLockLocker -TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockTxId); +TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) { + TLockInfo::TPtr lock = GetOrAddLock(lockTxId, lockNodeId); if (!lock || lock->IsBroken(at)) return lock; @@ -121,8 +122,8 @@ TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, const THashSet<TPathId> return lock; } -TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, const TPointKey& point, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockId); +TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, ui32 lockNodeId, const TPointKey& point, const TRowVersion& at) { + TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId); if (!lock || lock->IsBroken(at)) return lock; @@ -132,8 +133,8 @@ TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, const TPointKey& point, c return lock; } -TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, const TRangeKey& range, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockId); +TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TRangeKey& range, const TRowVersion& at) { + TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId); if (!lock || lock->IsBroken(at)) return lock; @@ -223,16 +224,25 @@ void TLockLocker::RemoveBrokenRanges() { } } -TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId) { +TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) { auto it = Locks.find(lockId); if (it != Locks.end()) { Limiter.TouchLock(lockId); + if (lockNodeId && !it->second->LockNodeId) { + // This shouldn't ever happen, but better safe than sorry + it->second->LockNodeId = lockNodeId; + PendingSubscribeLocks.emplace_back(lockId, lockNodeId); + } return it->second; } - TLockInfo::TPtr lock = Limiter.TryAddLock(lockId); - if (lock) + TLockInfo::TPtr lock = Limiter.TryAddLock(lockId, lockNodeId); + if (lock) { Locks[lockId] = lock; + if (lockNodeId) { + PendingSubscribeLocks.emplace_back(lockId, lockNodeId); + } + } return lock; } @@ -330,9 +340,13 @@ void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) { } } +void TLockLocker::RemoveSubscribedLock(ui64 lockId) { + RemoveLock(lockId); +} + // TLockLocker.TLockLimiter -TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId) { +TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId, ui32 lockNodeId) { #if 1 if (LocksQueue.Size() >= LockLimit()) { Parent->RemoveBrokenLocks(); @@ -352,7 +366,7 @@ TLockInfo::TPtr TLockLocker::TLockLimiter::TryAddLock(ui64 lockId) { } LocksQueue.Insert(lockId, TAppData::TimeProvider->Now()); - return TLockInfo::TPtr(new TLockInfo(Parent, lockId)); + return TLockInfo::TPtr(new TLockInfo(Parent, lockId, lockNodeId)); } void TLockLocker::TLockLimiter::RemoveLock(ui64 lockId) { @@ -414,7 +428,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { if (Update->BreakOwn) { counter = TLock::ErrorAlreadyBroken; } else if (Update->ShardLock) { - TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->AffectedTables, checkVersion); + TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->LockNodeId, Update->AffectedTables, checkVersion); if (lock) { Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); counter = lock->GetCounter(checkVersion); @@ -423,7 +437,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { } } else { for (const auto& key : Update->PointLocks) { - TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, key, checkVersion); + TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, Update->LockNodeId, key, checkVersion); if (lock) { Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); counter = lock->GetCounter(checkVersion); @@ -433,7 +447,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { } for (const auto& key : Update->RangeLocks) { - TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, key, checkVersion); + TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, Update->LockNodeId, key, checkVersion); if (lock) { Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); counter = lock->GetCounter(checkVersion); @@ -523,20 +537,20 @@ void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) { Update->EraseLock(GetLockId(key)); } -void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId) { +void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) return; if (lockTxId) { Y_VERIFY(Update); - Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId); + Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId, lockNodeId); } } -void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId) { +void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId) { if (range.Point) { // if range is point replace it with a point lock - SetLock(tableId, range.From, lockTxId); + SetLock(tableId, range.From, lockTxId, lockNodeId); return; } @@ -546,7 +560,7 @@ void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 if (lockTxId) { Y_VERIFY(Update); - Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId); + Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId, lockNodeId); } } @@ -570,11 +584,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) { Update->BreakShardLock(); } -void TSysLocks::BreakSetLocks(ui64 lockTxId) { +void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) { Y_VERIFY(Update); if (lockTxId) - Update->BreakSetLocks(lockTxId); + Update->BreakSetLocks(lockTxId, lockNodeId); } bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const { diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 19c68c4d88..e1389538f8 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -8,6 +8,7 @@ #include <ydb/core/tablet/tablet_counters.h> #include <library/cpp/cache/cache.h> +#include <util/generic/list.h> #include <util/generic/queue.h> #include <util/generic/set.h> @@ -135,6 +136,22 @@ struct TVersionedLockId { } }; +struct TPendingSubscribeLock { + ui64 LockId = 0; + ui32 LockNodeId = 0; + + TPendingSubscribeLock() = default; + + TPendingSubscribeLock(ui64 lockId, ui32 lockNodeId) + : LockId(lockId) + , LockNodeId(lockNodeId) + { } + + explicit operator bool() const { + return LockId != 0; + } +}; + /// Aggregates shard, point and range locks class TLockInfo : public TSimpleRefCount<TLockInfo> { friend class TTableLocks; @@ -143,7 +160,7 @@ class TLockInfo : public TSimpleRefCount<TLockInfo> { public: using TPtr = TIntrusivePtr<TLockInfo>; - TLockInfo(TLockLocker * locker, ui64 lockId); + TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId); ~TLockInfo(); ui64 GetCounter(const TRowVersion& at = TRowVersion::Max()) const { return !BreakVersion || at < *BreakVersion ? Counter : Max<ui64>(); } @@ -157,6 +174,7 @@ public: bool MayHavePointsAndRanges() const { return !ShardLock && (!BreakVersion || *BreakVersion); } ui64 GetLockId() const { return LockId; } + ui32 GetLockNodeId() const { return LockNodeId; } TInstant GetCreationTime() const { return CreationTime; } const THashSet<TPathId>& GetAffectedTables() const { return AffectedTables; } @@ -173,6 +191,7 @@ private: private: TLockLocker * Locker; ui64 LockId; + ui32 LockNodeId; ui64 Counter; TInstant CreationTime; THashSet<TPathId> AffectedTables; @@ -256,7 +275,7 @@ public: ui64 LocksCount() const { return Parent->LocksCount(); } - TLockInfo::TPtr TryAddLock(ui64 lockId); + TLockInfo::TPtr TryAddLock(ui64 lockId, ui32 lockNodeId); void RemoveLock(ui64 lockId); void TouchLock(ui64 lockId); @@ -280,9 +299,9 @@ public: Tables.clear(); } - TLockInfo::TPtr AddShardLock(ui64 lockTxId, const THashSet<TPathId>& affectedTables, const TRowVersion& at); - TLockInfo::TPtr AddPointLock(ui64 lockTxId, const TPointKey& key, const TRowVersion& at); - TLockInfo::TPtr AddRangeLock(ui64 lockTxId, const TRangeKey& key, const TRowVersion& at); + TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at); + TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at); + TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at); TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const; ui64 LocksCount() const { return Locks.size(); } @@ -326,6 +345,17 @@ public: // optimisation: set to remove broken lock at next Remove() void ScheduleLockCleanup(ui64 lockId, const TRowVersion& at); + TPendingSubscribeLock NextPendingSubscribeLock() { + TPendingSubscribeLock result; + if (!PendingSubscribeLocks.empty()) { + result = PendingSubscribeLocks.front(); + PendingSubscribeLocks.pop_front(); + } + return result; + } + + void RemoveSubscribedLock(ui64 lockId); + ui64 IncCounter() { return Counter++; }; private: @@ -337,6 +367,7 @@ private: TVector<ui64> CleanupPending; // LockIds of broken locks with pending cleanup TPriorityQueue<TVersionedLockId> BrokenCandidates; TPriorityQueue<TVersionedLockId> CleanupCandidates; + TList<TPendingSubscribeLock> PendingSubscribeLocks; TLockLimiter Limiter; ui64 Counter; @@ -348,7 +379,7 @@ private: void RemoveBrokenRanges(); - TLockInfo::TPtr GetOrAddLock(ui64 lockId); + TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId); void RemoveOneLock(ui64 lockId); void RemoveBrokenLocks(); }; @@ -356,6 +387,7 @@ private: /// A portion of locks update struct TLocksUpdate { ui64 LockTxId = 0; + ui32 LockNodeId = 0; TVector<TPointKey> PointLocks; TVector<TRangeKey> RangeLocks; TVector<TPointKey> PointBreaks; @@ -373,6 +405,7 @@ struct TLocksUpdate { void Clear() { LockTxId = 0; + LockNodeId = 0; ShardLock = false; ShardBreak = false; PointLocks.clear(); @@ -385,15 +418,15 @@ struct TLocksUpdate { return ShardLock || PointLocks.size() || RangeLocks.size(); } - void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId) { - Y_VERIFY(LockTxId == lockId); + void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId, ui32 lockNodeId) { + Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); AffectedTables.insert(tableId.PathId); RangeTables.insert(tableId.PathId); RangeLocks.push_back(range); } - void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId) { - Y_VERIFY(LockTxId == lockId); + void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId, ui32 lockNodeId) { + Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); AffectedTables.insert(tableId.PathId); PointLocks.push_back(key); } @@ -414,8 +447,8 @@ struct TLocksUpdate { Erases.push_back(lockId); } - void BreakSetLocks(ui64 lockId) { - Y_VERIFY(LockTxId == lockId); + void BreakSetLocks(ui64 lockId, ui32 lockNodeId) { + Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); BreakOwn = true; } }; @@ -468,11 +501,11 @@ public: ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const; TLock GetLock(const TArrayRef<const TCell>& syslockKey) const; void EraseLock(const TArrayRef<const TCell>& syslockKey); - void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId); - void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId); + void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); + void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId); void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key); void BreakAllLocks(const TTableId& tableId); - void BreakSetLocks(ui64 lockTxId); + void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId); bool IsMyKey(const TArrayRef<const TCell>& key) const; ui64 LocksCount() const { return Locker.LocksCount(); } @@ -489,6 +522,14 @@ public: return true; } + TPendingSubscribeLock NextPendingSubscribeLock() { + return Locker.NextPendingSubscribeLock(); + } + + void RemoveSubscribedLock(ui64 lockId) { + Locker.RemoveSubscribedLock(lockId); + } + private: THolder<TLocksDataShard> Self; TLockLocker Locker; diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp index 1d25c39f99..a3c54efe86 100644 --- a/ydb/core/tx/datashard/datashard_ut_locks.cpp +++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp @@ -159,12 +159,12 @@ namespace NTest { template <typename T> void SetLock(const TPointKey<T>& key) { - Locks.SetLock(TableId, key.GetRow(), LockId()); + Locks.SetLock(TableId, key.GetRow(), LockId(), 0); } template <typename T> void SetLock(const TRangeKey<T>& range) { - Locks.SetLock(TableId, range.GetRowsRange(), LockId()); + Locks.SetLock(TableId, range.GetRowsRange(), LockId(), 0); } template <typename T> @@ -173,7 +173,7 @@ namespace NTest { } void BreakSetLocks() { - Locks.BreakSetLocks(LockId()); + Locks.BreakSetLocks(LockId(), 0); } // diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index ab4092496a..be1a27b030 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1502,11 +1502,11 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { SimulateSleep(server, TDuration::Seconds(1)); - auto execSimpleRequest = [&](const TString& query) -> TString { + auto execSimpleRequest = [&](const TString& query, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) -> TString { auto reqSender = runtime.AllocateEdgeActor(); auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), expectedStatus); if (response.GetResponse().GetResults().size() == 0) { return ""; } @@ -1554,8 +1554,10 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { }; ui64 lastLockTxId = 0; + ui32 lastLockNodeId = 0; TRowVersion lastMvccSnapshot = TRowVersion::Min(); ui64 injectLockTxId = 0; + ui32 injectLockNodeId = 0; TRowVersion injectMvccSnapshot = TRowVersion::Min(); auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { switch (ev->GetTypeRewrite()) { @@ -1590,8 +1592,12 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } if (tx.GetLockTxId()) { lastLockTxId = tx.GetLockTxId(); + lastLockNodeId = tx.GetLockNodeId(); } else if (injectLockTxId) { tx.SetLockTxId(injectLockTxId); + if (injectLockNodeId) { + tx.SetLockNodeId(injectLockNodeId); + } TString txBody; Y_VERIFY(tx.SerializeToString(&txBody)); record.SetTxBody(txBody); @@ -1626,20 +1632,30 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { // We should have been acquiring locks Y_VERIFY(lastLockTxId != 0); ui64 snapshotLockTxId = lastLockTxId; + ui32 snapshotLockNodeId = lastLockNodeId; Y_VERIFY(lastMvccSnapshot); auto snapshotVersion = lastMvccSnapshot; // Perform an immediate write, pretending it happens as part of the above snapshot tx injectLockTxId = snapshotLockTxId; + injectLockNodeId = snapshotLockNodeId; injectMvccSnapshot = snapshotVersion; UNIT_ASSERT_VALUES_EQUAL( execSimpleRequest(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2) - )")), + )"), + UseNewEngine ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::UNAVAILABLE), ""); injectLockTxId = 0; + injectLockNodeId = 0; injectMvccSnapshot = TRowVersion::Min(); + // Old engine doesn't support LockNodeId + // There's nothing to test unless we can write uncommitted data + if (!UseNewEngine) { + return; + } + // Start another snapshot read, it should not see above write (it's uncommitted) TString sessionId2, txId2; UNIT_ASSERT_VALUES_EQUAL( diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index d477ca5514..d237bbdbcb 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -41,6 +41,7 @@ public: op->ChangeRecords() = std::move(tx->GetCollectedChanges()); DataShard.SysLocksTable().ApplyLocks(); + DataShard.SubscribeNewLocks(ctx); Pipeline.AddCommittingOp(op); return EExecutionStatus::DelayCompleteNoMoreRestarts; diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp index 5a09528adc..0ae61b8af8 100644 --- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp @@ -23,7 +23,7 @@ public: return !op->HasRuntimeConflicts(); } - EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override { + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { Y_VERIFY(op->IsCommitWritesTx()); TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); @@ -46,6 +46,7 @@ public: BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); DataShard.SysLocksTable().ApplyLocks(); + DataShard.SubscribeNewLocks(ctx); Pipeline.AddCommittingOp(op); return EExecutionStatus::ExecutedNoMoreRestarts; diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 9f6c59189b..b6ac426d25 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -24,7 +24,7 @@ public: private: void ExecuteDataTx(TOperation::TPtr op, const TActorContext& ctx); - void AddLocksToResult(TOperation::TPtr op); + void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx); }; TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard, @@ -254,15 +254,15 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, } if (counters.InvisibleRowSkips) { - DataShard.SysLocksTable().BreakSetLocks(op->LockTxId()); + DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId()); } - AddLocksToResult(op); + AddLocksToResult(op, ctx); Pipeline.AddCommittingOp(op); } -void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) { +void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) { auto locks = DataShard.SysLocksTable().ApplyLocks(); for (const auto& lock : locks) { if (lock.IsError()) { @@ -273,6 +273,7 @@ void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) { op->Result()->AddTxLock(lock.LockId, lock.DataShard, lock.Generation, lock.Counter, lock.SchemeShard, lock.PathId); } + DataShard.SubscribeNewLocks(ctx); } void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) { diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index f16db8eaa3..bc6fda112e 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -29,7 +29,7 @@ public: return !op->HasRuntimeConflicts(); } - EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override { + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { Y_VERIFY(op->IsDistributedEraseTx()); TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); @@ -82,6 +82,7 @@ public: BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); DataShard.SysLocksTable().ApplyLocks(); + DataShard.SubscribeNewLocks(ctx); Pipeline.AddCommittingOp(op); return EExecutionStatus::ExecutedNoMoreRestarts; diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index d1e02434d0..b60562af1a 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -137,6 +137,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio KqpRollbackLockChanges(tabletId, tx, DataShard, txc); KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); DataShard.SysLocksTable().ApplyLocks(); + DataShard.SubscribeNewLocks(ctx); return EExecutionStatus::Executed; } @@ -181,7 +182,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); if (dataTx->GetCounters().InvisibleRowSkips) { - DataShard.SysLocksTable().BreakSetLocks(op->LockTxId()); + DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId()); } AddLocksToResult(op, ctx); @@ -242,6 +243,7 @@ void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorCo LOG_T("add lock to result: " << op->Result()->Record.GetTxLocks().rbegin()->ShortDebugString()); } + DataShard.SubscribeNewLocks(ctx); } EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 3d330806c7..167f265f6a 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -672,6 +672,7 @@ public: return GetKeysInfo().ReadsCount + GetKeysInfo().WritesCount; } virtual ui64 LockTxId() const { return 0; } + virtual ui32 LockNodeId() const { return 0; } virtual bool HasLockedWrites() const { return false; } //////////////////////////////////////// diff --git a/ydb/core/tx/datashard/remove_locks.cpp b/ydb/core/tx/datashard/remove_locks.cpp new file mode 100644 index 0000000000..f05ab6d7b8 --- /dev/null +++ b/ydb/core/tx/datashard/remove_locks.cpp @@ -0,0 +1,68 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +using namespace NLongTxService; + +class TDataShard::TTxRemoveLock + : public NTabletFlatExecutor::TTransactionBase<TDataShard> +{ +public: + TTxRemoveLock(TDataShard* self, ui64 lockId) + : TBase(self) + , LockId(lockId) + { } + + TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + // Remove any uncommitted changes with this lock id + // FIXME: if a distributed tx has already validated (and persisted) + // its locks, we must preserve uncommitted changes even when lock is + // removed on the originating node, since the final outcome may + // actually decide to commit. + for (const auto& pr : Self->GetUserTables()) { + auto localTid = pr.second->LocalTid; + if (txc.DB.HasOpenTx(localTid, LockId)) { + txc.DB.RemoveTx(localTid, LockId); + } + } + + // Remove the lock from memory, it's no longer needed + Self->SysLocks.RemoveSubscribedLock(LockId); + + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + +private: + const ui64 LockId; +}; + +void TDataShard::Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx) { + auto* msg = ev->Get(); + const ui64 lockId = msg->Record.GetLockId(); + switch (msg->Record.GetStatus()) { + case NKikimrLongTxService::TEvLockStatus::STATUS_NOT_FOUND: + case NKikimrLongTxService::TEvLockStatus::STATUS_UNAVAILABLE: + Execute(new TTxRemoveLock(this, lockId), ctx); + break; + + default: + break; + } +} + +void TDataShard::SubscribeNewLocks(const TActorContext&) { + while (auto pendingSubscribeLock = SysLocks.NextPendingSubscribeLock()) { + Send(MakeLongTxServiceID(SelfId().NodeId()), + new TEvLongTxService::TEvSubscribeLock( + pendingSubscribeLock.LockId, + pendingSubscribeLock.LockNodeId)); + } +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h index 1774533dba..026ba14f41 100644 --- a/ydb/core/tx/datashard/setup_sys_locks.h +++ b/ydb/core/tx/datashard/setup_sys_locks.h @@ -17,6 +17,7 @@ struct TSetupSysLocks { update.Clear(); update.LockTxId = op->LockTxId(); + update.LockNodeId = op->LockNodeId(); if (self.IsMvccEnabled()) { auto [readVersion, writeVersion] = self.GetReadWriteVersions(op.Get()); diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.h b/ydb/core/tx/long_tx_service/public/lock_handle.h index fa7647d12a..84ee458871 100644 --- a/ydb/core/tx/long_tx_service/public/lock_handle.h +++ b/ydb/core/tx/long_tx_service/public/lock_handle.h @@ -56,6 +56,10 @@ namespace NLongTxService { return *this; } + explicit operator bool() const noexcept { + return bool(LockId); + } + ui64 GetLockId() const noexcept { return LockId; } |