diff options
author | snaury <snaury@ydb.tech> | 2022-12-20 13:24:24 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-12-20 13:24:24 +0300 |
commit | 0b3d10fb0817ad9b03497ef999ffc235ddac29fe (patch) | |
tree | 53dc86dffc2a34c5af2846ead030814f7c1ae1e7 | |
parent | 58e50341cc234a4291453eea2ccd0ab451f07c2f (diff) | |
download | ydb-0b3d10fb0817ad9b03497ef999ffc235ddac29fe.tar.gz |
Implement execution of volatile transactions
32 files changed, 1142 insertions, 100 deletions
diff --git a/ydb/core/base/pathid.h b/ydb/core/base/pathid.h index e974f846134..711c01593f9 100644 --- a/ydb/core/base/pathid.h +++ b/ydb/core/base/pathid.h @@ -26,6 +26,11 @@ struct TPathId { { } + template<typename H> + friend H AbslHashValue(H h, const TPathId& pathId) { + return H::combine(std::move(h), pathId.OwnerId, pathId.LocalPathId); + } + ui64 Hash() const; TString ToString() const; void Out(IOutputStream& o) const; diff --git a/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt b/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt index f621755b9ed..75f886c323d 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-client-minikql_compile diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt index 425b63a036e..72d59cdd50d 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-client-minikql_compile diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux.txt index 425b63a036e..72d59cdd50d 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.linux.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.linux.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-client-minikql_compile diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index c9a849a16a7..db861f006d2 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -28,6 +28,8 @@ #include <ydb/library/yql/dq/tasks/dq_connection_builder.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> + namespace NKikimr { namespace NKqp { @@ -1358,7 +1360,8 @@ private: SelfId(), TxId, dataTransaction.SerializeAsString(), - ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0); + (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0) | + (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0)); } auto traceId = ExecuterSpan.GetTraceId(); @@ -1645,6 +1648,11 @@ private: // Single-shard transactions are always immediate ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1; + if (ImmediateTx) { + // Transaction cannot be both immediate and volatile + YQL_ENSURE(!VolatileTx); + } + switch (Request.IsolationLevel) { // OnlineRO with AllowInconsistentReads = true case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED: @@ -1652,6 +1660,7 @@ private: // (legacy behaviour, for compatibility with current execution engine) case NKikimrKqp::ISOLATION_LEVEL_READ_STALE: YQL_ENSURE(ReadOnlyTx); + YQL_ENSURE(!VolatileTx); ImmediateTx = true; break; @@ -1661,6 +1670,7 @@ private: if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) { // Snapshot reads are always immediate + YQL_ENSURE(!VolatileTx); Snapshot = Request.Snapshot; ImmediateTx = true; } @@ -1707,6 +1717,7 @@ private: AppData()->FeatureFlags.GetEnableMvccSnapshotReads()); if (forceSnapshot) { + YQL_ENSURE(!VolatileTx); auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); @@ -1810,52 +1821,111 @@ private: Request.TopicOperations.BuildTopicTxs(topicTxs); - const bool useGenericReadSets = AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() || !topicTxs.empty(); + const bool needRollback = Request.EraseLocks && !Request.ValidateLocks; + + const bool uncommittedWrites = AppData()->FeatureFlags.GetEnableKqpImmediateEffects() && Request.Snapshot.IsValid(); + + VolatileTx = ( + // We want to use volatile transactions only when the feature is enabled + AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions() && + // We don't want volatile tx when acquiring locks + !Request.AcquireLocksTxId && + // We don't want readonly volatile transactions + !ReadOnlyTx && + // We only want to use volatile transactions with side-effects + !ShardsWithEffects.empty() && + // We don't want to use volatile transactions when doing a rollback + !needRollback && + // We cannot use volatile transaction for uncommitted writes + !uncommittedWrites && + // We cannot use volatile transactions with topics + // TODO: add support in the future + topicTxs.empty() && + // We only want to use volatile transactions for multiple shards + (datashardTasks.size() + topicTxs.size()) > 1 && + // We cannot use volatile transactions with persistent channels + // Note: currently persistent channels are never used + !HasPersistentChannels); + + const bool useGenericReadSets = ( + // Use generic readsets when feature is explicitly enabled + AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() || + // Volatile transactions must always use generic readsets + VolatileTx || + // Transactions with topics must always use generic readsets + !topicTxs.empty()); + + if (auto locksMap = ExtractLocks(Request.Locks); + !locksMap.empty() || + VolatileTx || + Request.TopicOperations.HasReadOperations()) + { + YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks || VolatileTx); - if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) { - YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks); + bool needCommit = ( + (Request.ValidateLocks && Request.EraseLocks) || + VolatileTx); - auto locksOp = Request.ValidateLocks && Request.EraseLocks + auto locksOp = needCommit ? NKikimrTxDataShard::TKqpLocks::Commit : (Request.ValidateLocks ? NKikimrTxDataShard::TKqpLocks::Validate : NKikimrTxDataShard::TKqpLocks::Rollback); - TSet<ui64> taskShardIds; - if (Request.ValidateLocks) { + absl::flat_hash_set<ui64> sendingShardsSet; + absl::flat_hash_set<ui64> receivingShardsSet; + + // Gather shards that need to send/receive readsets (shards with effects) + if (needCommit) { for (auto& [shardId, _] : datashardTasks) { if (ShardsWithEffects.contains(shardId)) { - taskShardIds.insert(shardId); + // Volatile transactions may abort effects, so they send readsets + if (VolatileTx) { + sendingShardsSet.insert(shardId); + } + receivingShardsSet.insert(shardId); } } - taskShardIds.merge(Request.TopicOperations.GetReceivingTabletIds()); + if (auto tabletIds = Request.TopicOperations.GetSendingTabletIds()) { + sendingShardsSet.insert(tabletIds.begin(), tabletIds.end()); + } + + if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) { + receivingShardsSet.insert(tabletIds.begin(), tabletIds.end()); + } } - TSet<ui64> locksSendingShards; + // Gather locks that need to be committed or erased for (auto& [shardId, locksList] : locksMap) { auto& tx = datashardTxs[shardId]; tx.MutableLocks()->SetOp(locksOp); - for (auto& lock : locksList) { - tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock); - } + if (!locksList.empty()) { + auto* protoLocks = tx.MutableLocks()->MutableLocks(); + protoLocks->Reserve(locksList.size()); + for (auto& lock : locksList) { + protoLocks->Add()->Swap(&lock); + } - if (!locksList.empty() && Request.ValidateLocks) { - locksSendingShards.insert(shardId); + if (needCommit) { + // We also send the result on commit + sendingShardsSet.insert(shardId); + } } } - locksSendingShards.merge(Request.TopicOperations.GetSendingTabletIds()); + if (Request.ValidateLocks || needCommit) { + NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end()); + NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end()); - if (Request.ValidateLocks) { - NProtoBuf::RepeatedField<ui64> sendingShards(locksSendingShards.begin(), locksSendingShards.end()); - NProtoBuf::RepeatedField<ui64> receivingShards(taskShardIds.begin(), taskShardIds.end()); + std::sort(sendingShards.begin(), sendingShards.end()); + std::sort(receivingShards.begin(), receivingShards.end()); for (auto& [shardId, shardTx] : datashardTxs) { shardTx.MutableLocks()->SetOp(locksOp); - shardTx.MutableLocks()->MutableSendingShards()->CopyFrom(sendingShards); - shardTx.MutableLocks()->MutableReceivingShards()->CopyFrom(receivingShards); + *shardTx.MutableLocks()->MutableSendingShards() = sendingShards; + *shardTx.MutableLocks()->MutableReceivingShards() = receivingShards; } for (auto& [_, tx] : topicTxs) { @@ -1869,12 +1939,12 @@ private: case NKikimrTxDataShard::TKqpLocks::Rollback: tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback); break; - default: + case NKikimrTxDataShard::TKqpLocks::Unspecified: break; } - tx.MutableSendingShards()->CopyFrom(sendingShards); - tx.MutableReceivingShards()->CopyFrom(receivingShards); + *tx.MutableSendingShards() = sendingShards; + *tx.MutableReceivingShards() = receivingShards; } } } @@ -2009,6 +2079,7 @@ private: << ", readonly: " << ReadOnlyTx << ", datashardTxs: " << DatashardTxs.size() << ", topicTxs: " << Request.TopicOperations.GetSize() + << ", volatile: " << VolatileTx << ", immediate: " << ImmediateTx << ", remote tasks" << remoteComputeTasksCnt << ", useFollowers: " << UseFollowers); @@ -2028,6 +2099,7 @@ private: auto lockTxId = Request.AcquireLocksTxId; if (lockTxId.Defined() && *lockTxId == 0) { lockTxId = TxId; + LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); } for (auto& tx : topicTxs) { @@ -2190,6 +2262,7 @@ private: TVector<NKikimrTxDataShard::TLock> Locks; TVector<TKqpExecuterTxResult> Results; bool ReadOnlyTx = true; + bool VolatileTx = false; bool ImmediateTx = false; bool UseFollowers = false; bool TxPlanned = false; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f1873b966db..a48324b33db 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -759,6 +759,7 @@ message TFeatureFlags { optional bool EnableAlterDatabaseCreateHiveFirst = 82 [default = false]; reserved 83; // EnableKqpDataQuerySourceRead optional bool EnableSmallDiskOptimization = 84 [default = true]; + optional bool EnableDataShardVolatileTransactions = 85 [default = false]; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 8aaa9ec8019..2c37a991a78 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1798,3 +1798,18 @@ message TEvReplicationSourceOffsetsCancel { // Cancels a previously started read for (Sender, ReadId) optional uint64 ReadId = 1; } + +message TTxVolatileDetails { + optional uint64 TxId = 1; + + // Step and TxId that should be used for the eventual commit + optional uint64 VersionStep = 2; + optional uint64 VersionTxId = 3; + + // A list of local db transactions that need to be committed on success or rolled back on failure + repeated uint64 CommitTxIds = 4 [packed = true]; + + // A list of volatile transaction ids that need to be committed or rolled back before this transaction commits. + // This is because local db changes must be committed in the same order they performed writes. + repeated uint64 Dependencies = 5 [packed = true]; +} diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h index 85ca8f929a0..a15d333964f 100644 --- a/ydb/core/scheme/scheme_tabledefs.h +++ b/ydb/core/scheme/scheme_tabledefs.h @@ -89,6 +89,11 @@ struct TTableId { return !operator==(x); } + template<typename H> + friend H AbslHashValue(H h, const TTableId& tableId) { + return H::combine(std::move(h), tableId.PathId, tableId.SysViewInfo, tableId.SchemaVersion); + } + ui64 Hash() const noexcept { auto hash = PathId.Hash(); if (SysViewInfo) { diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 8cb61e0fee7..a2cd6cc3291 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -620,6 +620,13 @@ bool TDatabase::ValidateCommit(TString &err) return true; } +bool TDatabase::HasChanges() const +{ + Y_VERIFY(Redo, "Transaction is not in progress"); + + return *Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions; +} + TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator *cookieAllocator) { TempIterators.clear(); @@ -636,7 +643,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator IteratedTables.clear(); } - if (commit && (*Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions)) { + if (commit && HasChanges()) { Y_VERIFY(stamp >= Change->Stamp); Y_VERIFY(DatabaseImpl->Serial() == Change->Serial); diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index a30e4190fa8..96b194ac746 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -196,6 +196,11 @@ public: TKeyRangeCache* DebugGetTableErasedKeysCache(ui32 table) const; + /** + * Returns true when current transaction has changes to commit + */ + bool HasChanges() const; + // executor interface void Begin(TTxStamp, IPages& env); TProd Commit(TTxStamp, bool commit, TCookieAllocator* = nullptr); diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h index b2f73153c20..423f172ae28 100644 --- a/ydb/core/tablet_flat/flat_table_committed.h +++ b/ydb/core/tablet_flat/flat_table_committed.h @@ -138,6 +138,51 @@ namespace NTable { }; /** + * A special implementation of a dynamic transaction map with a possible base + */ + class TDynamicTransactionMap final : public ITransactionMap { + public: + TDynamicTransactionMap() = default; + + explicit TDynamicTransactionMap(ITransactionMapPtr base) + : Base(std::move(base)) + { } + + public: + const TRowVersion* Find(ui64 txId) const override { + auto it = Values.find(txId); + if (it != Values.end()) { + return &it->second; + } + return Base.Find(txId); + } + + void SetBase(ITransactionMapPtr base) { + Base = std::move(base); + } + + void Add(ui64 txId, TRowVersion version) { + Values[txId] = version; + } + + void Remove(ui64 txId) { + Values.erase(txId); + } + + void Clear() { + Values.clear(); + } + + bool Empty() const { + return Values.empty() && !Base; + } + + private: + ITransactionMapPtr Base; + absl::flat_hash_map<ui64, TRowVersion> Values; + }; + + /** * A simple copy-on-write data structure for a TxId -> RowVersion map * * Pretends to be an instance of ITransactionMapPtr diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index e8c17c91489..48eeae29cb3 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -44,6 +44,7 @@ public: FEATURE_FLAG_SETTER(EnableKqpImmediateEffects) FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets) FEATURE_FLAG_SETTER(EnableAlterDatabaseCreateHiveFirst) + FEATURE_FLAG_SETTER(EnableDataShardVolatileTransactions) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt index 5658b055b84..474414f0000 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt @@ -48,6 +48,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata @@ -234,6 +235,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp @@ -300,6 +302,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index ec538ef3c0b..8a3825a0dc1 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -49,6 +49,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata @@ -235,6 +236,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp @@ -302,6 +304,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt index ec538ef3c0b..8a3825a0dc1 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux.txt @@ -49,6 +49,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata @@ -235,6 +236,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp @@ -302,6 +304,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource contrib-libs-zstd cpp-actors-core + cpp-containers-absl_flat_hash cpp-containers-flat_hash cpp-digest-md5 cpp-html-pcdata diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 757ddf7068c..59184c1707d 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -125,6 +125,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , SysLocks(this) , SnapshotManager(this) , SchemaSnapshotManager(this) + , VolatileTxManager(this) , DisableByKeyFilter(0, 0, 1) , MaxTxInFly(15000, 0, 100000) , MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll) diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 46b2370d316..eebeb026f80 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -245,6 +245,10 @@ public: return ReadVersion; } + void SetVolatileTxId(ui64 txId) { + VolatileTxId = txId; + } + void SetIsImmediateTx() { IsImmediateTx = true; } @@ -254,12 +258,12 @@ public: } IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override { - auto it = ChangeCollectors.find(tableId); + auto it = ChangeCollectors.find(tableId.PathId); if (it != ChangeCollectors.end()) { return it->second.Get(); } - it = ChangeCollectors.emplace(tableId, nullptr).first; + it = ChangeCollectors.emplace(tableId.PathId, nullptr).first; if (!Self->IsUserTable(tableId)) { return it->second.Get(); } @@ -276,6 +280,19 @@ public: return; } + if (VolatileTxId) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); + if (VolatileCommitTxIds.insert(lockId).second) { + // Update TxMap to include the new commit + auto it = TxMaps.find(tableId.PathId); + if (it != TxMaps.end()) { + it->second->Add(lockId, WriteVersion); + } + } + return; + } + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); DB.CommitTx(localTid, lockId, writeVersion); @@ -314,6 +331,32 @@ public: } } + TVector<ui64> GetVolatileCommitTxIds() const { + TVector<ui64> commitTxIds; + + if (!VolatileCommitTxIds.empty()) { + commitTxIds.reserve(VolatileCommitTxIds.size()); + for (ui64 commitTxId : VolatileCommitTxIds) { + commitTxIds.push_back(commitTxId); + } + } + + return commitTxIds; + } + + TVector<ui64> GetVolatileDependencies() const { + TVector<ui64> dependencies; + + if (!VolatileDependencies.empty()) { + dependencies.reserve(VolatileDependencies.size()); + for (ui64 dependency : VolatileDependencies) { + dependencies.push_back(dependency); + } + } + + return dependencies; + } + bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override { if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); @@ -490,29 +533,52 @@ public: if (TSysTables::IsSystemTable(tableId)) return 0; + if (VolatileTxId) { + Y_VERIFY(!LockTxId); + VolatileCommitTxIds.insert(VolatileTxId); + return VolatileTxId; + } + return LockTxId; } NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const override { - if (TSysTables::IsSystemTable(tableId) || !LockTxId) + if (TSysTables::IsSystemTable(tableId)) { + return nullptr; + } + + auto baseTxMap = Self->GetVolatileTxManager().GetTxMap(); + if (!baseTxMap && !VolatileTxId && !LockTxId) { + // Don't use tx map when there's nothing we want to view as committed return nullptr; + } // Don't use tx map when we know there's no write lock for a table // Note: currently write lock implies uncommitted changes - if (!Self->SysLocksTable().HasCurrentWriteLock(tableId)) { + if (!baseTxMap && !VolatileTxId && LockTxId && !Self->SysLocksTable().HasCurrentWriteLock(tableId)) { return nullptr; } - auto& ptr = TxMaps[tableId]; + auto& ptr = TxMaps[tableId.PathId]; if (!ptr) { - // Uncommitted changes are visible in all possible snapshots - ptr = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + ptr = new NTable::TDynamicTransactionMap(baseTxMap); + if (LockTxId) { + // Uncommitted changes are visible in all possible snapshots + ptr->Add(LockTxId, TRowVersion::Min()); + } else if (VolatileTxId) { + // We want volatile changes to be visible at the write vrsion + ptr->Add(VolatileTxId, WriteVersion); + for (ui64 commitTxId : VolatileCommitTxIds) { + ptr->Add(commitTxId, WriteVersion); + } + } } return ptr; } NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override { + // TODO: handle volatile transactions if (TSysTables::IsSystemTable(tableId) || !LockTxId) return nullptr; @@ -522,7 +588,7 @@ public: return nullptr; } - auto& ptr = TxObservers[tableId]; + auto& ptr = TxObservers[tableId.PathId]; if (!ptr) { // This observer is supposed to find conflicts ptr = new TReadTxObserver(this, tableId); @@ -558,6 +624,7 @@ public: } void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override { + // TODO: handle volatile read dependencies Host->CheckReadConflict(TableId, rowVersion); } @@ -644,6 +711,8 @@ public: } void OnSkipUncommitted(ui64 txId) override { + // TODO: we need to somehow remember that this lock must add a + // dependency to some other volatile tx at commit time. if (!Host->Db.HasRemovedTx(LocalTid, txId)) { ++SkipCount; if (!SelfFound) { @@ -688,6 +757,9 @@ public: } void OnSkipUncommitted(ui64 txId) override { + // TODO: handle volatile write dependencies + // Note that all active volatile transactions will be uncommitted + // here, since we are not using tx map for conflict detection. Host->BreakWriteConflict(txId); } @@ -734,10 +806,13 @@ private: TInstant Now; TRowVersion WriteVersion = TRowVersion::Max(); TRowVersion ReadVersion = TRowVersion::Min(); - THashSet<ui64> CommittedLockChanges; - mutable THashMap<TTableId, THolder<IDataShardChangeCollector>> ChangeCollectors; - mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps; - mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers; + ui64 VolatileTxId = 0; + absl::flat_hash_set<ui64> CommittedLockChanges; + mutable absl::flat_hash_map<TPathId, THolder<IDataShardChangeCollector>> ChangeCollectors; + mutable absl::flat_hash_map<TPathId, TIntrusivePtr<NTable::TDynamicTransactionMap>> TxMaps; + mutable absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers; + mutable absl::flat_hash_set<ui64> VolatileCommitTxIds; + mutable absl::flat_hash_set<ui64> VolatileDependencies; }; // @@ -908,6 +983,13 @@ void TEngineBay::SetReadVersion(TRowVersion readVersion) { ComputeCtx->SetReadVersion(readVersion); } +void TEngineBay::SetVolatileTxId(ui64 txId) { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + host->SetVolatileTxId(txId); +} + void TEngineBay::SetIsImmediateTx() { Y_VERIFY(EngineHost); @@ -941,6 +1023,20 @@ void TEngineBay::ResetCollectedChanges() { host->ResetCollectedChanges(); } +TVector<ui64> TEngineBay::GetVolatileCommitTxIds() const { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + return host->GetVolatileCommitTxIds(); +} + +TVector<ui64> TEngineBay::GetVolatileDependencies() const { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + return host->GetVolatileDependencies(); +} + IEngineFlat * TEngineBay::GetEngine() { if (!Engine) { Engine = CreateEngineFlat(*EngineSettings); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 9d8ac1ba6f3..6a4b877b7d2 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -99,6 +99,7 @@ public: void SetWriteVersion(TRowVersion writeVersion); void SetReadVersion(TRowVersion readVersion); + void SetVolatileTxId(ui64 txId); void SetIsImmediateTx(); void SetIsRepeatableSnapshot(); @@ -107,6 +108,9 @@ public: TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const; void ResetCollectedChanges(); + TVector<ui64> GetVolatileCommitTxIds() const; + TVector<ui64> GetVolatileDependencies() const; + void ResetCounters() { EngineHostCounters = TEngineHostCounters(); } const TEngineHostCounters& GetCounters() const { return EngineHostCounters; } diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 626b9afa5b9..fac931dff73 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -171,6 +171,8 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { PRECHARGE_SYS_TABLE(Schema::Locks); PRECHARGE_SYS_TABLE(Schema::LockRanges); PRECHARGE_SYS_TABLE(Schema::LockConflicts); + PRECHARGE_SYS_TABLE(Schema::TxVolatileDetails); + PRECHARGE_SYS_TABLE(Schema::TxVolatileParticipants); if (!ready) return false; @@ -514,6 +516,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } + if (Self->State != TShardState::Offline) { + if (!Self->VolatileTxManager.Load(db)) { + return false; + } + } + Self->SubscribeNewLocks(); Self->ScheduleRemoveAbandonedLockChanges(); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 70f041a9787..4ac1ac3082f 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -761,6 +761,25 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded) } plan.push_back(EExecutionUnitKind::FinishPropose); plan.push_back(EExecutionUnitKind::CompletedOperations); + } else if (HasVolatilePrepareFlag()) { + Y_VERIFY(!loaded); + plan.push_back(EExecutionUnitKind::CheckDataTx); + plan.push_back(EExecutionUnitKind::StoreDataTx); // note: stores in memory + plan.push_back(EExecutionUnitKind::FinishPropose); + Y_VERIFY(!GetStep()); + plan.push_back(EExecutionUnitKind::WaitForPlan); + plan.push_back(EExecutionUnitKind::PlanQueue); + plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory + plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); + Y_VERIFY(IsKqpDataTransaction()); + // Note: execute will also prepare and send readsets + plan.push_back(EExecutionUnitKind::ExecuteKqpDataTx); + // Note: it is important that plan here is the same as regular + // distributed tx, since normal tx may decide to commit in a + // volatile manner with dependencies, to avoid waiting for + // locked keys to resolve. + plan.push_back(EExecutionUnitKind::CompleteOperation); + plan.push_back(EExecutionUnitKind::CompletedOperations); } else { if (!loaded) { plan.push_back(EExecutionUnitKind::CheckDataTx); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 8a1c715d75e..6acc43e9afb 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -179,6 +179,7 @@ public: void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); } void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); } + void SetVolatileTxId(ui64 txId) { EngineBay.SetVolatileTxId(txId); } void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) { EngineBay.CommitChanges(tableId, lockId, writeVersion, txc); @@ -187,6 +188,9 @@ public: TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); } void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); } + TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); } + TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); } + TActorId Source() const { return Source_; } void SetSource(const TActorId& actorId) { Source_ = actorId; } void SetStep(ui64 step) { StepTxId_.Step = step; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 04bdf2a5afc..f23838a66ad 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -18,6 +18,7 @@ #include "change_record.h" #include "progress_queue.h" #include "read_iterator.h" +#include "volatile_tx.h" #include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> @@ -258,6 +259,7 @@ class TDataShard friend class NMiniKQL::TKqpScanComputeContext; friend class TSnapshotManager; friend class TSchemaSnapshotManager; + friend class TVolatileTxManager; friend class TReplicationSourceOffsetsClient; friend class TReplicationSourceOffsetsServer; @@ -870,13 +872,44 @@ class TDataShard >; }; + // Describes a volatile transaction that executed and possibly made + // some changes, but is not fully committed, waiting for decision from + // other participants. Transaction would be committed at Step:TxId on + // success, and usually has 1-2 uncommitted TxIds that need to be + // committed. + struct TxVolatileDetails : Table<32> { + // Volatile TxId of the transaction + struct TxId : Column<1, NScheme::NTypeIds::Uint64> {}; + // State of transaction, initially undecided, but becomes committed or aborted until it is removed + struct State : Column<2, NScheme::NTypeIds::Uint32> { using Type = EVolatileTxState; }; + // Transaction details encoded in a protobuf message + struct Details : Column<3, NScheme::NTypeIds::String> { using Type = NKikimrTxDataShard::TTxVolatileDetails; }; + + using TKey = TableKey<TxId>; + using TColumns = TableColumns<TxId, State, Details>; + }; + + // Associated participants for a volatile transaction that need to + // decide a transaction and from which a readset is expected. Usually a + // COMMIT decision from a participant causes removal of a corresponding + // row, and ABORT decision causes full transaction abort, with removal + // of all corresponding rows. + struct TxVolatileParticipants : Table<33> { + struct TxId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct ShardId : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<TxId, ShardId>; + using TColumns = TableColumns<TxId, ShardId>; + }; + using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue, DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress, Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts, SrcChangeSenderActivations, DstChangeSenderActivations, ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived, UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts, - LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits>; + LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits, + TxVolatileDetails, TxVolatileParticipants>; // These settings are persisted on each Init. So we use empty settings in order not to overwrite what // was changed by the user @@ -1614,6 +1647,10 @@ public: void AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId, TTransactionContext& txc, const TActorContext& ctx); + TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; } + const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; } + + template <typename... Args> bool PromoteCompleteEdge(Args&&... args) { return SnapshotManager.PromoteCompleteEdge(std::forward<Args>(args)...); @@ -2235,6 +2272,7 @@ private: TSnapshotManager SnapshotManager; TSchemaSnapshotManager SchemaSnapshotManager; + TVolatileTxManager VolatileTxManager; TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index c0188834c50..08c734b9621 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -660,7 +660,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); - if (kqpTx.HasLocks() && !NeedValidateLocks(kqpTx.GetLocks().GetOp())) { + if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) { return true; } @@ -730,6 +730,132 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) return true; } +bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { + auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + + if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) { + return true; + } + + // Volatile transactions cannot work with non-generic readsets + YQL_ENSURE(kqpTx.GetUseGenericReadSets()); + + // We may have some stale data since before the restart + tx->OutReadSets().clear(); + tx->AwaitingDecisions().clear(); + + // Note: usually all shards send locks, since they either have side effects or need to validate locks + // However it is technically possible to have pure-read shards, that don't contribute to the final decision + bool sendLocks = SendLocks(kqpTx.GetLocks(), origin); + if (sendLocks) { + // Note: it is possible to have no locks + auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin); + + if (!brokenLocks.empty()) { + tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( + NKikimrTxDataShard::TX_KIND_DATA, + origin, + tx->GetTxId(), + NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + + auto* protoLocks = tx->Result()->Record.MutableTxLocks(); + for (auto& brokenLock : brokenLocks) { + protoLocks->Add()->Swap(&brokenLock); + } + + return false; + } + + // We need to form decision readsets for all other participants + for (ui64 dstTabletId : kqpTx.GetLocks().GetReceivingShards()) { + if (dstTabletId == origin) { + // Don't send readsets to ourselves + continue; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Send commit decision from " + << origin << " to " << dstTabletId); + + auto key = std::make_pair(origin, dstTabletId); + NKikimrTx::TReadSetData data; + data.SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT); + + TString bodyStr; + bool ok = data.SerializeToString(&bodyStr); + Y_VERIFY(ok, "Failed to serialize readset from %" PRIu64 " to %" PRIu64, key.first, key.second); + + tx->OutReadSets()[key] = std::move(bodyStr); + } + } + + bool receiveLocks = ReceiveLocks(kqpTx.GetLocks(), origin); + if (receiveLocks) { + // Note: usually only shards with side-effects receive locks, since they + // need the final outcome to decide whether to commit or abort. + for (ui64 srcTabletId : kqpTx.GetLocks().GetSendingShards()) { + if (srcTabletId == origin) { + // Don't await decision from ourselves + continue; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Will wait for volatile decision from " + << srcTabletId << " to " << origin); + + tx->AwaitingDecisions().insert(srcTabletId); + } + + bool aborted = false; + + for (auto& record : tx->DelayedInReadSets()) { + ui64 srcTabletId = record.GetTabletSource(); + ui64 dstTabletId = record.GetTabletDest(); + if (!tx->AwaitingDecisions().contains(srcTabletId) || dstTabletId != origin) { + // Don't process unexpected readsets + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Ignoring unexpected readset from " + << srcTabletId << " to " << dstTabletId << " for txId# " << tx->GetTxId() << " at tablet " << origin); + continue; + } + + if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) { + // No readset data: participant aborted the transaction + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed no data readset from" + << srcTabletId << " to " << dstTabletId << " will abort txId# " << tx->GetTxId()); + aborted = true; + break; + } + + NKikimrTx::TReadSetData data; + bool ok = data.ParseFromString(record.GetReadSet()); + Y_VERIFY(ok, "Failed to parse readset from %" PRIu64 " to %" PRIu64, srcTabletId, dstTabletId); + + if (data.GetDecision() != NKikimrTx::TReadSetData::DECISION_COMMIT) { + // Explicit decision that is not a commit, need to abort + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed decision " + << ui32(data.GetDecision()) << " from " << srcTabletId << " to " << dstTabletId + << " for txId# " << tx->GetTxId()); + aborted = true; + break; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed commit decision from " + << srcTabletId << " to " << dstTabletId << " for txId# " << tx->GetTxId()); + tx->AwaitingDecisions().erase(srcTabletId); + } + + if (aborted) { + tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( + NKikimrTxDataShard::TX_KIND_DATA, + origin, + tx->GetTxId(), + NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED); + + return false; + } + } + + return true; +} + void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 52086ea0f70..458a103f912 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -35,6 +35,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId); bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); +bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc); diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index bb2a1ad6ef8..6e57ee9bec4 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -110,11 +110,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) { } void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { - for (auto& pr : Self.GetUserTables()) { - auto tid = pr.second->LocalTid; - // Removing the lock also removes any uncommitted data - if (DB.HasOpenTx(tid, lockId)) { - DB.RemoveTx(tid, lockId); + // We remove lock changes unless it's managed by volatile tx manager + if (!Self.GetVolatileTxManager().FindByCommitTxId(lockId)) { + for (auto& pr : Self.GetUserTables()) { + auto tid = pr.second->LocalTid; + // Removing the lock also removes any uncommitted data + if (DB.HasOpenTx(tid, lockId)) { + DB.RemoveTx(tid, lockId); + } } } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 6f95c80acad..c30648109ef 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1188,7 +1188,18 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; + auto badRequest = [&](const TString& error) { + tx->SetAbortedFlag(); + tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult( + rec.GetTxKind(), Self->TabletID(), tx->GetTxId(), NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)); + tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT, error); + + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); + }; + if (tx->IsSchemeTx()) { + Y_VERIFY(!tx->HasVolatilePrepareFlag(), "Volatile scheme transactions not supported"); + Y_VERIFY(rec.HasSchemeShardId()); Y_VERIFY(rec.HasProcessingParams()); @@ -1216,6 +1227,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } + if (tx->HasVolatilePrepareFlag()) { + badRequest("Unsupported volatile snapshot tx"); + return tx; + } + if (tx->GetSnapshotTx().HasCreateVolatileSnapshot()) { tx->SetReadOnlyFlag(); tx->SetGlobalReaderFlag(); @@ -1231,6 +1247,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } + if (tx->HasVolatilePrepareFlag()) { + badRequest("Unsupported volatile distributed erase"); + return tx; + } + tx->SetGlobalWriterFlag(); if (tx->GetDistributedEraseTx()->HasDependents()) { tx->SetGlobalReaderFlag(); @@ -1241,6 +1262,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } + if (tx->HasVolatilePrepareFlag()) { + badRequest("Unsupported volatile commit writes"); + return tx; + } + tx->SetGlobalWriterFlag(); } else { Y_VERIFY(tx->IsReadTable() || tx->IsDataTx()); @@ -1277,6 +1303,35 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: tx->SetGlobalReaderFlag(); } + // Additional checks for volatile transactions + if (tx->HasVolatilePrepareFlag()) { + if (tx->HasImmediateFlag()) { + badRequest(TStringBuilder() + << "Volatile distributed tx " << tx->GetTxId() + << " at tablet " << Self->TabletID() + << " cannot be immediate"); + return tx; + } + + if (!dataTx->IsKqpDataTx()) { + badRequest(TStringBuilder() + << "Volatile distributed tx " << tx->GetTxId() + << " at tablet " << Self->TabletID() + << " must be a kqp data tx"); + return tx; + } + + if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) { + badRequest(TStringBuilder() + << "Volatile distributed tx " << tx->GetTxId() + << " at tablet " << Self->TabletID() + << " cannot have persistent channels"); + return tx; + } + + Y_VERIFY(!tx->IsImmediate(), "Sanity check failed: volatile tx cannot be immediate"); + } + // Make config checks for immediate tx. if (tx->IsImmediate()) { if (Config.NoImmediate() || (Config.ForceOnlineRW() && !dataTx->ReadOnly())) { @@ -1310,26 +1365,10 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: if (!tx->IsMvccSnapshotRead()) { // No op } else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) { - tx->SetAbortedFlag(); - TString err = "Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction"; - tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(), - Self->TabletID(), - tx->GetTxId(), - NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)); - tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); - LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err); - + badRequest("Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction"); return tx; } else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) { - tx->SetAbortedFlag(); - TString err = "Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction"; - tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(), - Self->TabletID(), - tx->GetTxId(), - NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)); - tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT,err); - LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err); - + badRequest("Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction"); return tx; } @@ -1350,15 +1389,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: }; if(tx->IsMvccSnapshotRead() && !allowSnapshot()) { - tx->SetAbortedFlag(); - TString err = "Snapshot read must be an immediate read only or locked write transaction"; - tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(), - Self->TabletID(), - tx->GetTxId(), - NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)); - tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT,err); - LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err); - + badRequest("Snapshot read must be an immediate read-only or locked-write transaction"); return tx; } diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 0258c9c2e3b..efec2da9e31 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -33,23 +33,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); - auto forceVolatile = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { - switch (ev->GetTypeRewrite()) { - case TEvDataShard::TEvProposeTransaction::EventType: { - auto* msg = ev->Get<TEvDataShard::TEvProposeTransaction>(); - auto flags = msg->Record.GetFlags(); - if (!(flags & TTxFlags::Immediate)) { - Cerr << "... forcing propose to use volatile prepare" << Endl; - flags |= TTxFlags::VolatilePrepare; - msg->Record.SetFlags(flags); - } - break; - } - } - return TTestActorRuntimeBase::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(forceVolatile); - + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); Cerr << "!!! distributed write start" << Endl; @@ -59,8 +43,24 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { )"); Cerr << "!!! distributed write end" << Endl; - runtime.SetObserverFunc(prevObserverFunc); + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } }"); + + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + RebootTablet(runtime, shards1.at(0), sender); + // We should see same results after restart UNIT_ASSERT_VALUES_EQUAL( KqpSimpleExec(runtime, R"( SELECT key, value FROM `/Root/table-1` diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index d211051c9ab..54365b8bef2 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -166,7 +166,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } } - if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) { + const bool validated = op->HasVolatilePrepareFlag() + ? KqpValidateVolatileTx(tabletId, tx, DataShard.SysLocksTable()) + : KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable()); + + if (!validated) { KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); DataShard.SysLocksTable().ApplyLocks(); DataShard.SubscribeNewLocks(ctx); @@ -199,6 +203,10 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->SetReadVersion(readVersion); dataTx->SetWriteVersion(writeVersion); + if (op->HasVolatilePrepareFlag()) { + dataTx->SetVolatileTxId(tx->GetTxId()); + } + KqpCommitLocks(tabletId, tx, writeVersion, DataShard, txc); auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); @@ -272,15 +280,33 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio DataShard.SysLocksTable().BreakSetLocks(); } + // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies + // Such transactions would have no participants and become immediately committed + auto commitTxIds = dataTx->GetVolatileCommitTxIds(); + if (commitTxIds) { + TVector<ui64> participants(tx->AwaitingDecisions().begin(), tx->AwaitingDecisions().end()); + DataShard.GetVolatileTxManager().PersistAddVolatileTx( + tx->GetTxId(), + writeVersion, + commitTxIds, + dataTx->GetVolatileDependencies(), + participants, + txc); + } + + if (op->HasVolatilePrepareFlag() && !op->OutReadSets().empty()) { + DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx); + } + + // Note: may erase persistent locks, must be after we persist volatile tx AddLocksToResult(op, ctx); - auto changes = std::move(dataTx->GetCollectedChanges()); - if (guardLocks.LockTxId) { - if (changes) { - DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes)); + if (auto changes = std::move(dataTx->GetCollectedChanges())) { + if (commitTxIds || guardLocks.LockTxId) { + DataShard.AddLockChangeRecords(commitTxIds ? tx->GetTxId() : guardLocks.LockTxId, std::move(changes)); + } else { + op->ChangeRecords() = std::move(changes); } - } else { - op->ChangeRecords() = std::move(changes); } KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); @@ -351,12 +377,14 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds()); op->ResetCurrentTimer(); - if (op->IsReadOnly() && !locksDb.HasChanges()) { + if (txc.DB.HasChanges()) { + op->SetWaitCompletionFlag(true); + } else if (op->IsReadOnly()) { return EExecutionStatus::Executed; } - if (locksDb.HasChanges()) { - op->SetWaitCompletionFlag(true); + if (op->HasVolatilePrepareFlag() && !op->PreparedOutReadSets().empty()) { + return EExecutionStatus::DelayCompleteNoMoreRestarts; } return EExecutionStatus::ExecutedNoMoreRestarts; @@ -395,7 +423,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, return EExecutionStatus::Restart; } -void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {} +void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr op, const TActorContext& ctx) { + if (op->HasVolatilePrepareFlag() && !op->PreparedOutReadSets().empty()) { + DataShard.SendReadSets(ctx, std::move(op->PreparedOutReadSets())); + } +} THolder<TExecutionUnit> CreateExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) { return THolder(new TExecuteKqpDataTxUnit(dataShard, pipeline)); diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index fb182f810ca..ca43b25cabe 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -14,6 +14,7 @@ #include <ydb/core/tx/balance_coverage/balance_coverage_builder.h> #include <ydb/core/tx/tx_processing.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> #include <library/cpp/containers/flat_hash/flat_hash.h> #include <util/generic/hash.h> @@ -419,6 +420,7 @@ struct TInputOpData { using TSnapshots = TVector<TIntrusivePtr<TTableSnapshotContext>>; using TInReadSets = TMap<std::pair<ui64, ui64>, TVector<TRSData>>; using TCoverageBuilders = TMap<std::pair<ui64, ui64>, std::shared_ptr<TBalanceCoverageBuilder>>; + using TAwaitingDecisions = absl::flat_hash_set<ui64>; TInputOpData() : RemainReadSets(0) @@ -435,6 +437,7 @@ struct TInputOpData { ui32 RemainReadSets; TAutoPtr<IDestructable> ScanResult; TAutoPtr<IDestructable> AsyncJobResult; + TAwaitingDecisions AwaitingDecisions; }; struct TOutputOpData { @@ -590,6 +593,8 @@ public: void SetAsyncJobResult(TAutoPtr<IDestructable> prod) { InputDataRef().AsyncJobResult = prod; } bool HasAsyncJobResult() const { return InputData ? (bool)InputData->AsyncJobResult : false; } + TInputOpData::TAwaitingDecisions &AwaitingDecisions() { return InputDataRef().AwaitingDecisions; } + //////////////////////////////////////// // OUTPUT DATA // //////////////////////////////////////// diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp new file mode 100644 index 00000000000..06ecb6c927f --- /dev/null +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -0,0 +1,281 @@ +#include "volatile_tx.h" +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + + void TVolatileTxManager::TTxMap::Add(ui64 txId, TRowVersion version) { + Map[txId] = version; + } + + void TVolatileTxManager::TTxMap::Remove(ui64 txId) { + Map.erase(txId); + } + + const TRowVersion* TVolatileTxManager::TTxMap::Find(ui64 txId) const { + auto it = Map.find(txId); + if (it != Map.end()) { + return &it->second; + } + return nullptr; + } + + void TVolatileTxManager::Clear() { + VolatileTxs.clear(); + VolatileTxByCommitTxId.clear(); + TxMap.Reset(); + } + + bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) { + using Schema = TDataShard::Schema; + + Y_VERIFY(VolatileTxs.empty() && VolatileTxByCommitTxId.empty() && !TxMap, + "Unexpected Load into non-empty volatile tx manager"); + + // Tables may not exist in some inactive shards, which cannot have transactions + if (db.HaveTable<Schema::TxVolatileDetails>() && + db.HaveTable<Schema::TxVolatileParticipants>()) + { + if (!LoadTxDetails(db)) { + return false; + } + if (!LoadTxParticipants(db)) { + return false; + } + } + + return true; + } + + bool TVolatileTxManager::LoadTxDetails(NIceDb::TNiceDb& db) { + using Schema = TDataShard::Schema; + + auto rowset = db.Table<Schema::TxVolatileDetails>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + ui64 txId = rowset.GetValue<Schema::TxVolatileDetails::TxId>(); + EVolatileTxState state = rowset.GetValue<Schema::TxVolatileDetails::State>(); + auto details = rowset.GetValue<Schema::TxVolatileDetails::Details>(); + + Y_VERIFY_S(txId == details.GetTxId(), + "Volatile txId# " << txId << " has unexpected details with txId# " << details.GetTxId()); + + auto res = VolatileTxs.insert( + std::make_pair(txId, std::make_unique<TVolatileTxInfo>())); + Y_VERIFY_S(res.second, "Unexpected duplicate volatile txId# " << txId); + + auto* info = res.first->second.get(); + info->TxId = txId; + info->State = state; + info->Version = TRowVersion(details.GetVersionStep(), details.GetVersionTxId()); + info->CommitTxIds.insert(details.GetCommitTxIds().begin(), details.GetCommitTxIds().end()); + info->Dependencies.insert(details.GetDependencies().begin(), details.GetDependencies().end()); + info->AddCommitted = true; // we loaded it from local db, so it is committed + + if (!rowset.Next()) { + return false; + } + } + + auto postProcessTxInfo = [this](TVolatileTxInfo* info) { + switch (info->State) { + case EVolatileTxState::Waiting: + case EVolatileTxState::Committed: { + if (!TxMap) { + TxMap = MakeIntrusive<TTxMap>(); + } + + // Waiting and Committed transactions need to be added to TxMap until they are fully resolved + // Note that aborted transactions are removed from TxMap and don't need to be re-added + for (ui64 commitTxId : info->CommitTxIds) { + auto res2 = VolatileTxByCommitTxId.emplace(commitTxId, info); + Y_VERIFY_S(res2.second, "Unexpected duplicate commitTxId# " << commitTxId); + TxMap->Add(commitTxId, info->Version); + } + + for (auto it = info->Dependencies.begin(); it != info->Dependencies.end(); /* nothing */) { + ui64 dependencyTxId = *it; + auto* dependency = FindByTxId(dependencyTxId); + if (!dependency || dependency->State == EVolatileTxState::Aborted) { + // Skip dependencies that have been aborted already + info->Dependencies.erase(it++); + continue; + } + dependency->Dependents.insert(info->TxId); + ++it; + } + + if (info->Dependencies.empty() && info->State == EVolatileTxState::Committed) { + // TODO: committed transactions without dependencies are ready to fully commit + } + + return; + } + + case EVolatileTxState::Aborted: { + // Aborted transactions don't have dependencies + info->Dependencies.clear(); + + // TODO: aborted transactions are ready to be rolled back + return; + } + } + + Y_VERIFY_S(false, "Unexpected volatile txId# " << info->TxId << " @" << info->Version << " with state# " << ui32(info->State)); + }; + + for (auto& pr : VolatileTxs) { + postProcessTxInfo(pr.second.get()); + } + + return true; + } + + bool TVolatileTxManager::LoadTxParticipants(NIceDb::TNiceDb& db) { + using Schema = TDataShard::Schema; + + auto rowset = db.Table<Schema::TxVolatileParticipants>().Select(); + if (!rowset.IsReady()) { + return false; + } + + TVolatileTxInfo* lastInfo = nullptr; + while (!rowset.EndOfSet()) { + ui64 txId = rowset.GetValue<Schema::TxVolatileParticipants::TxId>(); + ui64 shardId = rowset.GetValue<Schema::TxVolatileParticipants::ShardId>(); + + auto* info = (lastInfo && lastInfo->TxId == txId) ? lastInfo : FindByTxId(txId); + Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId); + + // Only waiting transactions may have participants + Y_VERIFY_S(info->State == EVolatileTxState::Waiting, + "Unexpected volatile txId# " << txId << " with participant# " << shardId + << " in state# " << ui32(info->State)); + + info->Participants.insert(shardId); + + if (!rowset.Next()) { + return false; + } + } + + for (auto& pr : VolatileTxs) { + auto* info = pr.second.get(); + + // Sanity check that are are no waiting transactions without participants + if (info->State == EVolatileTxState::Waiting) { + Y_VERIFY_S(!info->Participants.empty(), + "Unexpected waiting volatile txId# " << info->TxId << " without participants"); + } + } + + return true; + } + + TVolatileTxInfo* TVolatileTxManager::FindByTxId(ui64 txId) const { + auto it = VolatileTxs.find(txId); + if (it != VolatileTxs.end()) { + return it->second.get(); + } + return nullptr; + } + + TVolatileTxInfo* TVolatileTxManager::FindByCommitTxId(ui64 txId) const { + auto it = VolatileTxByCommitTxId.find(txId); + if (it != VolatileTxByCommitTxId.end()) { + return it->second; + } + return nullptr; + } + + void TVolatileTxManager::PersistAddVolatileTx( + ui64 txId, const TRowVersion& version, + TConstArrayRef<ui64> commitTxIds, + TConstArrayRef<ui64> dependencies, + TConstArrayRef<ui64> participants, + TTransactionContext& txc) + { + using Schema = TDataShard::Schema; + + Y_VERIFY_S(!commitTxIds.empty(), + "Unexpected volatile txId# " << txId << " @" << version << " without commits"); + + auto res = VolatileTxs.insert( + std::make_pair(txId, std::make_unique<TVolatileTxInfo>())); + Y_VERIFY_S(res.second, "Cannot add volatile txId# " << txId << " @" << version + << ": duplicate volatile tx @" << res.first->second->Version << " already exists"); + + auto* info = res.first->second.get(); + info->TxId = txId; + info->Version = version; + info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end()); + info->Dependencies.insert(dependencies.begin(), dependencies.end()); + info->Participants.insert(participants.begin(), participants.end()); + + if (info->Participants.empty()) { + // Transaction is committed when we don't have to wait for other participants + info->State = EVolatileTxState::Committed; + } + + if (!TxMap) { + TxMap = MakeIntrusive<TTxMap>(); + } + + for (ui64 commitTxId : commitTxIds) { + auto res2 = VolatileTxByCommitTxId.emplace(commitTxId, info); + Y_VERIFY_S(res2.second, "Cannot add volatile txId# " << txId << " @" << version << " with commitTxId# " << commitTxId + << ": already registered for txId# " << res.first->second->TxId << " @" << res.first->second->Version); + TxMap->Add(commitTxId, version); + } + + for (ui64 dependencyTxId : info->Dependencies) { + auto* dependency = FindByTxId(dependencyTxId); + Y_VERIFY_S(dependency, "Cannot find dependency txId# " << dependencyTxId + << " for volatile txId# " << txId << " @" << version); + Y_VERIFY_S(dependency->State != EVolatileTxState::Aborted, + "Unexpected aborted dependency txId# " << dependencyTxId + << " for volatile txId# " << txId << " @" << version); + dependency->Dependents.insert(txId); + } + + NIceDb::TNiceDb db(txc.DB); + + NKikimrTxDataShard::TTxVolatileDetails details; + details.SetTxId(txId); + details.SetVersionStep(version.Step); + details.SetVersionTxId(version.TxId); + + if (!info->CommitTxIds.empty()) { + auto* m = details.MutableCommitTxIds(); + m->Add(info->CommitTxIds.begin(), info->CommitTxIds.end()); + std::sort(m->begin(), m->end()); + } + + if (!info->Dependencies.empty()) { + auto* m = details.MutableDependencies(); + m->Add(info->Dependencies.begin(), info->Dependencies.end()); + std::sort(m->begin(), m->end()); + } + + db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update( + NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State), + NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details))); + for (ui64 shardId : participants) { + db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update(); + } + + txc.OnCommit([this, txId]() { + auto* info = FindByTxId(txId); + Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId); + Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId); + info->AddCommitted = true; + // TODO: activate pending responses + }); + txc.OnRollback([txId]() { + Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId); + }); + } + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h new file mode 100644 index 00000000000..6c509a456af --- /dev/null +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -0,0 +1,135 @@ +#pragma once +#include <ydb/core/tablet_flat/flat_table_committed.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> +#include <util/generic/hash.h> + +namespace NKikimr::NTabletFlatExecutor { + + class TTransactionContext; + +} // namespace NKikimr::NTabletFlatExecutor + +namespace NKikimr::NIceDb { + + class TNiceDb; + +} // namespace NKikimr::NIceDb + +namespace NKikimr::NDataShard { + + class TDataShard; + + enum class EVolatileTxState : ui32 { + // Volatile transaction is waiting for decisions from other participants + Waiting = 0, + // Volatile transaction is logically committed, but not yet committed in storage + Committed = 1, + // Volatile transaction is aborted, waiting to be garbage collected + Aborted = 2, + }; + + struct TVolatileTxInfo { + ui64 TxId; + EVolatileTxState State = EVolatileTxState::Waiting; + TRowVersion Version; + absl::flat_hash_set<ui64> CommitTxIds; + absl::flat_hash_set<ui64> Dependencies; + absl::flat_hash_set<ui64> Dependents; + absl::flat_hash_set<ui64> Participants; + bool AddCommitted = false; + }; + + class TVolatileTxManager { + using TTransactionContext = NKikimr::NTabletFlatExecutor::TTransactionContext; + + private: + /** + * A custom implementation of tx map without any copy-on-write support, + * instead the map is changed directly whenever the set of transaction + * changes, possibly affecting existing iterators. Since this tx map + * is never used for scans it should be safe to do. + */ + class TTxMap final : public NTable::ITransactionMap { + public: + void Add(ui64 txId, TRowVersion version); + void Remove(ui64 txId); + + bool Empty() const { + return Map.empty(); + } + + const TRowVersion* Find(ui64 txId) const override; + + private: + absl::flat_hash_map<ui64, TRowVersion> Map; + }; + + /** + * A helper access class that automatically converts to the optimal + * executor argument type. + */ + class TTxMapAccess { + public: + TTxMapAccess(const TIntrusivePtr<TTxMap>& txMap) + : TxMap(txMap) + { } + + explicit operator bool() const { + return TxMap && !TxMap->Empty(); + } + + operator NTable::ITransactionMapPtr() const { + if (TxMap && !TxMap->Empty()) { + return TxMap; + } else { + return nullptr; + } + } + + operator NTable::ITransactionMapSimplePtr() const { + if (TxMap && !TxMap->Empty()) { + return static_cast<NTable::ITransactionMap*>(TxMap.Get()); + } else { + return nullptr; + } + } + + private: + const TIntrusivePtr<TTxMap>& TxMap; + }; + + public: + TVolatileTxManager(TDataShard* self) + : Self(self) + { } + + void Clear(); + bool Load(NIceDb::TNiceDb& db); + + TVolatileTxInfo* FindByTxId(ui64 txId) const; + TVolatileTxInfo* FindByCommitTxId(ui64 txId) const; + + void PersistAddVolatileTx( + ui64 txId, const TRowVersion& version, + TConstArrayRef<ui64> commitTxIds, + TConstArrayRef<ui64> dependencies, + TConstArrayRef<ui64> participants, + TTransactionContext& txc); + + TTxMapAccess GetTxMap() const { + return TTxMapAccess(TxMap); + } + + private: + bool LoadTxDetails(NIceDb::TNiceDb& db); + bool LoadTxParticipants(NIceDb::TNiceDb& db); + + private: + TDataShard* const Self; + absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info + absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info + TIntrusivePtr<TTxMap> TxMap; + }; + +} // namespace NKikimr::NDataShard diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index ceb05783686..20d676d546b 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1725,6 +1725,95 @@ } }, { + "TableId": 32, + "TableName": "TxVolatileDetails", + "TableKey": [ + 1 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "TxId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "State", + "ColumnType": "Uint32" + }, + { + "ColumnId": 3, + "ColumnName": "Details", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { + "TableId": 33, + "TableName": "TxVolatileParticipants", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "TxId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "ShardId", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { "TableId": 101, "TableName": "LockChangeRecords", "TableKey": [ |