aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-12-20 13:24:24 +0300
committersnaury <snaury@ydb.tech>2022-12-20 13:24:24 +0300
commit0b3d10fb0817ad9b03497ef999ffc235ddac29fe (patch)
tree53dc86dffc2a34c5af2846ead030814f7c1ae1e7
parent58e50341cc234a4291453eea2ccd0ab451f07c2f (diff)
downloadydb-0b3d10fb0817ad9b03497ef999ffc235ddac29fe.tar.gz
Implement execution of volatile transactions
-rw-r--r--ydb/core/base/pathid.h5
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp121
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto15
-rw-r--r--ydb/core/scheme/scheme_tabledefs.h5
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp9
-rw-r--r--ydb/core/tablet_flat/flat_database.h5
-rw-r--r--ydb/core/tablet_flat/flat_table_committed.h45
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin.txt3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux.txt3
-rw-r--r--ydb/core/tx/datashard/datashard.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp120
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h4
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp19
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h4
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h40
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp128
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h1
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp85
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp36
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp54
-rw-r--r--ydb/core/tx/datashard/operation.h5
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp281
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h135
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema89
32 files changed, 1142 insertions, 100 deletions
diff --git a/ydb/core/base/pathid.h b/ydb/core/base/pathid.h
index e974f846134..711c01593f9 100644
--- a/ydb/core/base/pathid.h
+++ b/ydb/core/base/pathid.h
@@ -26,6 +26,11 @@ struct TPathId {
{
}
+ template<typename H>
+ friend H AbslHashValue(H h, const TPathId& pathId) {
+ return H::combine(std::move(h), pathId.OwnerId, pathId.LocalPathId);
+ }
+
ui64 Hash() const;
TString ToString() const;
void Out(IOutputStream& o) const;
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt b/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt
index f621755b9ed..75f886c323d 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.darwin.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
index 425b63a036e..72d59cdd50d 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux.txt
index 425b63a036e..72d59cdd50d 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.linux.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.linux.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-client-minikql_compile
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index c9a849a16a7..db861f006d2 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -28,6 +28,8 @@
#include <ydb/library/yql/dq/tasks/dq_connection_builder.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
+
namespace NKikimr {
namespace NKqp {
@@ -1358,7 +1360,8 @@ private:
SelfId(),
TxId,
dataTransaction.SerializeAsString(),
- ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0);
+ (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0) |
+ (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0));
}
auto traceId = ExecuterSpan.GetTraceId();
@@ -1645,6 +1648,11 @@ private:
// Single-shard transactions are always immediate
ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1;
+ if (ImmediateTx) {
+ // Transaction cannot be both immediate and volatile
+ YQL_ENSURE(!VolatileTx);
+ }
+
switch (Request.IsolationLevel) {
// OnlineRO with AllowInconsistentReads = true
case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
@@ -1652,6 +1660,7 @@ private:
// (legacy behaviour, for compatibility with current execution engine)
case NKikimrKqp::ISOLATION_LEVEL_READ_STALE:
YQL_ENSURE(ReadOnlyTx);
+ YQL_ENSURE(!VolatileTx);
ImmediateTx = true;
break;
@@ -1661,6 +1670,7 @@ private:
if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) {
// Snapshot reads are always immediate
+ YQL_ENSURE(!VolatileTx);
Snapshot = Request.Snapshot;
ImmediateTx = true;
}
@@ -1707,6 +1717,7 @@ private:
AppData()->FeatureFlags.GetEnableMvccSnapshotReads());
if (forceSnapshot) {
+ YQL_ENSURE(!VolatileTx);
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
@@ -1810,52 +1821,111 @@ private:
Request.TopicOperations.BuildTopicTxs(topicTxs);
- const bool useGenericReadSets = AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() || !topicTxs.empty();
+ const bool needRollback = Request.EraseLocks && !Request.ValidateLocks;
+
+ const bool uncommittedWrites = AppData()->FeatureFlags.GetEnableKqpImmediateEffects() && Request.Snapshot.IsValid();
+
+ VolatileTx = (
+ // We want to use volatile transactions only when the feature is enabled
+ AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions() &&
+ // We don't want volatile tx when acquiring locks
+ !Request.AcquireLocksTxId &&
+ // We don't want readonly volatile transactions
+ !ReadOnlyTx &&
+ // We only want to use volatile transactions with side-effects
+ !ShardsWithEffects.empty() &&
+ // We don't want to use volatile transactions when doing a rollback
+ !needRollback &&
+ // We cannot use volatile transaction for uncommitted writes
+ !uncommittedWrites &&
+ // We cannot use volatile transactions with topics
+ // TODO: add support in the future
+ topicTxs.empty() &&
+ // We only want to use volatile transactions for multiple shards
+ (datashardTasks.size() + topicTxs.size()) > 1 &&
+ // We cannot use volatile transactions with persistent channels
+ // Note: currently persistent channels are never used
+ !HasPersistentChannels);
+
+ const bool useGenericReadSets = (
+ // Use generic readsets when feature is explicitly enabled
+ AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
+ // Volatile transactions must always use generic readsets
+ VolatileTx ||
+ // Transactions with topics must always use generic readsets
+ !topicTxs.empty());
+
+ if (auto locksMap = ExtractLocks(Request.Locks);
+ !locksMap.empty() ||
+ VolatileTx ||
+ Request.TopicOperations.HasReadOperations())
+ {
+ YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks || VolatileTx);
- if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) {
- YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks);
+ bool needCommit = (
+ (Request.ValidateLocks && Request.EraseLocks) ||
+ VolatileTx);
- auto locksOp = Request.ValidateLocks && Request.EraseLocks
+ auto locksOp = needCommit
? NKikimrTxDataShard::TKqpLocks::Commit
: (Request.ValidateLocks
? NKikimrTxDataShard::TKqpLocks::Validate
: NKikimrTxDataShard::TKqpLocks::Rollback);
- TSet<ui64> taskShardIds;
- if (Request.ValidateLocks) {
+ absl::flat_hash_set<ui64> sendingShardsSet;
+ absl::flat_hash_set<ui64> receivingShardsSet;
+
+ // Gather shards that need to send/receive readsets (shards with effects)
+ if (needCommit) {
for (auto& [shardId, _] : datashardTasks) {
if (ShardsWithEffects.contains(shardId)) {
- taskShardIds.insert(shardId);
+ // Volatile transactions may abort effects, so they send readsets
+ if (VolatileTx) {
+ sendingShardsSet.insert(shardId);
+ }
+ receivingShardsSet.insert(shardId);
}
}
- taskShardIds.merge(Request.TopicOperations.GetReceivingTabletIds());
+ if (auto tabletIds = Request.TopicOperations.GetSendingTabletIds()) {
+ sendingShardsSet.insert(tabletIds.begin(), tabletIds.end());
+ }
+
+ if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
+ receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
+ }
}
- TSet<ui64> locksSendingShards;
+ // Gather locks that need to be committed or erased
for (auto& [shardId, locksList] : locksMap) {
auto& tx = datashardTxs[shardId];
tx.MutableLocks()->SetOp(locksOp);
- for (auto& lock : locksList) {
- tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock);
- }
+ if (!locksList.empty()) {
+ auto* protoLocks = tx.MutableLocks()->MutableLocks();
+ protoLocks->Reserve(locksList.size());
+ for (auto& lock : locksList) {
+ protoLocks->Add()->Swap(&lock);
+ }
- if (!locksList.empty() && Request.ValidateLocks) {
- locksSendingShards.insert(shardId);
+ if (needCommit) {
+ // We also send the result on commit
+ sendingShardsSet.insert(shardId);
+ }
}
}
- locksSendingShards.merge(Request.TopicOperations.GetSendingTabletIds());
+ if (Request.ValidateLocks || needCommit) {
+ NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
+ NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end());
- if (Request.ValidateLocks) {
- NProtoBuf::RepeatedField<ui64> sendingShards(locksSendingShards.begin(), locksSendingShards.end());
- NProtoBuf::RepeatedField<ui64> receivingShards(taskShardIds.begin(), taskShardIds.end());
+ std::sort(sendingShards.begin(), sendingShards.end());
+ std::sort(receivingShards.begin(), receivingShards.end());
for (auto& [shardId, shardTx] : datashardTxs) {
shardTx.MutableLocks()->SetOp(locksOp);
- shardTx.MutableLocks()->MutableSendingShards()->CopyFrom(sendingShards);
- shardTx.MutableLocks()->MutableReceivingShards()->CopyFrom(receivingShards);
+ *shardTx.MutableLocks()->MutableSendingShards() = sendingShards;
+ *shardTx.MutableLocks()->MutableReceivingShards() = receivingShards;
}
for (auto& [_, tx] : topicTxs) {
@@ -1869,12 +1939,12 @@ private:
case NKikimrTxDataShard::TKqpLocks::Rollback:
tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback);
break;
- default:
+ case NKikimrTxDataShard::TKqpLocks::Unspecified:
break;
}
- tx.MutableSendingShards()->CopyFrom(sendingShards);
- tx.MutableReceivingShards()->CopyFrom(receivingShards);
+ *tx.MutableSendingShards() = sendingShards;
+ *tx.MutableReceivingShards() = receivingShards;
}
}
}
@@ -2009,6 +2079,7 @@ private:
<< ", readonly: " << ReadOnlyTx
<< ", datashardTxs: " << DatashardTxs.size()
<< ", topicTxs: " << Request.TopicOperations.GetSize()
+ << ", volatile: " << VolatileTx
<< ", immediate: " << ImmediateTx
<< ", remote tasks" << remoteComputeTasksCnt
<< ", useFollowers: " << UseFollowers);
@@ -2028,6 +2099,7 @@ private:
auto lockTxId = Request.AcquireLocksTxId;
if (lockTxId.Defined() && *lockTxId == 0) {
lockTxId = TxId;
+ LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
}
for (auto& tx : topicTxs) {
@@ -2190,6 +2262,7 @@ private:
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<TKqpExecuterTxResult> Results;
bool ReadOnlyTx = true;
+ bool VolatileTx = false;
bool ImmediateTx = false;
bool UseFollowers = false;
bool TxPlanned = false;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index f1873b966db..a48324b33db 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -759,6 +759,7 @@ message TFeatureFlags {
optional bool EnableAlterDatabaseCreateHiveFirst = 82 [default = false];
reserved 83; // EnableKqpDataQuerySourceRead
optional bool EnableSmallDiskOptimization = 84 [default = true];
+ optional bool EnableDataShardVolatileTransactions = 85 [default = false];
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 8aaa9ec8019..2c37a991a78 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1798,3 +1798,18 @@ message TEvReplicationSourceOffsetsCancel {
// Cancels a previously started read for (Sender, ReadId)
optional uint64 ReadId = 1;
}
+
+message TTxVolatileDetails {
+ optional uint64 TxId = 1;
+
+ // Step and TxId that should be used for the eventual commit
+ optional uint64 VersionStep = 2;
+ optional uint64 VersionTxId = 3;
+
+ // A list of local db transactions that need to be committed on success or rolled back on failure
+ repeated uint64 CommitTxIds = 4 [packed = true];
+
+ // A list of volatile transaction ids that need to be committed or rolled back before this transaction commits.
+ // This is because local db changes must be committed in the same order they performed writes.
+ repeated uint64 Dependencies = 5 [packed = true];
+}
diff --git a/ydb/core/scheme/scheme_tabledefs.h b/ydb/core/scheme/scheme_tabledefs.h
index 85ca8f929a0..a15d333964f 100644
--- a/ydb/core/scheme/scheme_tabledefs.h
+++ b/ydb/core/scheme/scheme_tabledefs.h
@@ -89,6 +89,11 @@ struct TTableId {
return !operator==(x);
}
+ template<typename H>
+ friend H AbslHashValue(H h, const TTableId& tableId) {
+ return H::combine(std::move(h), tableId.PathId, tableId.SysViewInfo, tableId.SchemaVersion);
+ }
+
ui64 Hash() const noexcept {
auto hash = PathId.Hash();
if (SysViewInfo) {
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 8cb61e0fee7..a2cd6cc3291 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -620,6 +620,13 @@ bool TDatabase::ValidateCommit(TString &err)
return true;
}
+bool TDatabase::HasChanges() const
+{
+ Y_VERIFY(Redo, "Transaction is not in progress");
+
+ return *Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions;
+}
+
TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator *cookieAllocator)
{
TempIterators.clear();
@@ -636,7 +643,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
IteratedTables.clear();
}
- if (commit && (*Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions)) {
+ if (commit && HasChanges()) {
Y_VERIFY(stamp >= Change->Stamp);
Y_VERIFY(DatabaseImpl->Serial() == Change->Serial);
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index a30e4190fa8..96b194ac746 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -196,6 +196,11 @@ public:
TKeyRangeCache* DebugGetTableErasedKeysCache(ui32 table) const;
+ /**
+ * Returns true when current transaction has changes to commit
+ */
+ bool HasChanges() const;
+
// executor interface
void Begin(TTxStamp, IPages& env);
TProd Commit(TTxStamp, bool commit, TCookieAllocator* = nullptr);
diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h
index b2f73153c20..423f172ae28 100644
--- a/ydb/core/tablet_flat/flat_table_committed.h
+++ b/ydb/core/tablet_flat/flat_table_committed.h
@@ -138,6 +138,51 @@ namespace NTable {
};
/**
+ * A special implementation of a dynamic transaction map with a possible base
+ */
+ class TDynamicTransactionMap final : public ITransactionMap {
+ public:
+ TDynamicTransactionMap() = default;
+
+ explicit TDynamicTransactionMap(ITransactionMapPtr base)
+ : Base(std::move(base))
+ { }
+
+ public:
+ const TRowVersion* Find(ui64 txId) const override {
+ auto it = Values.find(txId);
+ if (it != Values.end()) {
+ return &it->second;
+ }
+ return Base.Find(txId);
+ }
+
+ void SetBase(ITransactionMapPtr base) {
+ Base = std::move(base);
+ }
+
+ void Add(ui64 txId, TRowVersion version) {
+ Values[txId] = version;
+ }
+
+ void Remove(ui64 txId) {
+ Values.erase(txId);
+ }
+
+ void Clear() {
+ Values.clear();
+ }
+
+ bool Empty() const {
+ return Values.empty() && !Base;
+ }
+
+ private:
+ ITransactionMapPtr Base;
+ absl::flat_hash_map<ui64, TRowVersion> Values;
+ };
+
+ /**
* A simple copy-on-write data structure for a TxId -> RowVersion map
*
* Pretends to be an instance of ITransactionMapPtr
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index e8c17c91489..48eeae29cb3 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -44,6 +44,7 @@ public:
FEATURE_FLAG_SETTER(EnableKqpImmediateEffects)
FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets)
FEATURE_FLAG_SETTER(EnableAlterDatabaseCreateHiveFirst)
+ FEATURE_FLAG_SETTER(EnableDataShardVolatileTransactions)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt
index 5658b055b84..474414f0000 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt
@@ -48,6 +48,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
@@ -234,6 +235,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
@@ -300,6 +302,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index ec538ef3c0b..8a3825a0dc1 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -49,6 +49,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
@@ -235,6 +236,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
@@ -302,6 +304,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt
index ec538ef3c0b..8a3825a0dc1 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux.txt
@@ -49,6 +49,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
@@ -235,6 +236,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_distributed_erase_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/store_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
@@ -302,6 +304,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
contrib-libs-zstd
cpp-actors-core
+ cpp-containers-absl_flat_hash
cpp-containers-flat_hash
cpp-digest-md5
cpp-html-pcdata
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 757ddf7068c..59184c1707d 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -125,6 +125,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, SysLocks(this)
, SnapshotManager(this)
, SchemaSnapshotManager(this)
+ , VolatileTxManager(this)
, DisableByKeyFilter(0, 0, 1)
, MaxTxInFly(15000, 0, 100000)
, MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 46b2370d316..eebeb026f80 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -245,6 +245,10 @@ public:
return ReadVersion;
}
+ void SetVolatileTxId(ui64 txId) {
+ VolatileTxId = txId;
+ }
+
void SetIsImmediateTx() {
IsImmediateTx = true;
}
@@ -254,12 +258,12 @@ public:
}
IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
- auto it = ChangeCollectors.find(tableId);
+ auto it = ChangeCollectors.find(tableId.PathId);
if (it != ChangeCollectors.end()) {
return it->second.Get();
}
- it = ChangeCollectors.emplace(tableId, nullptr).first;
+ it = ChangeCollectors.emplace(tableId.PathId, nullptr).first;
if (!Self->IsUserTable(tableId)) {
return it->second.Get();
}
@@ -276,6 +280,19 @@ public:
return;
}
+ if (VolatileTxId) {
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID());
+ if (VolatileCommitTxIds.insert(lockId).second) {
+ // Update TxMap to include the new commit
+ auto it = TxMaps.find(tableId.PathId);
+ if (it != TxMaps.end()) {
+ it->second->Add(lockId, WriteVersion);
+ }
+ }
+ return;
+ }
+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID());
DB.CommitTx(localTid, lockId, writeVersion);
@@ -314,6 +331,32 @@ public:
}
}
+ TVector<ui64> GetVolatileCommitTxIds() const {
+ TVector<ui64> commitTxIds;
+
+ if (!VolatileCommitTxIds.empty()) {
+ commitTxIds.reserve(VolatileCommitTxIds.size());
+ for (ui64 commitTxId : VolatileCommitTxIds) {
+ commitTxIds.push_back(commitTxId);
+ }
+ }
+
+ return commitTxIds;
+ }
+
+ TVector<ui64> GetVolatileDependencies() const {
+ TVector<ui64> dependencies;
+
+ if (!VolatileDependencies.empty()) {
+ dependencies.reserve(VolatileDependencies.size());
+ for (ui64 dependency : VolatileDependencies) {
+ dependencies.push_back(dependency);
+ }
+ }
+
+ return dependencies;
+ }
+
bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override {
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
@@ -490,29 +533,52 @@ public:
if (TSysTables::IsSystemTable(tableId))
return 0;
+ if (VolatileTxId) {
+ Y_VERIFY(!LockTxId);
+ VolatileCommitTxIds.insert(VolatileTxId);
+ return VolatileTxId;
+ }
+
return LockTxId;
}
NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const override {
- if (TSysTables::IsSystemTable(tableId) || !LockTxId)
+ if (TSysTables::IsSystemTable(tableId)) {
+ return nullptr;
+ }
+
+ auto baseTxMap = Self->GetVolatileTxManager().GetTxMap();
+ if (!baseTxMap && !VolatileTxId && !LockTxId) {
+ // Don't use tx map when there's nothing we want to view as committed
return nullptr;
+ }
// Don't use tx map when we know there's no write lock for a table
// Note: currently write lock implies uncommitted changes
- if (!Self->SysLocksTable().HasCurrentWriteLock(tableId)) {
+ if (!baseTxMap && !VolatileTxId && LockTxId && !Self->SysLocksTable().HasCurrentWriteLock(tableId)) {
return nullptr;
}
- auto& ptr = TxMaps[tableId];
+ auto& ptr = TxMaps[tableId.PathId];
if (!ptr) {
- // Uncommitted changes are visible in all possible snapshots
- ptr = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ ptr = new NTable::TDynamicTransactionMap(baseTxMap);
+ if (LockTxId) {
+ // Uncommitted changes are visible in all possible snapshots
+ ptr->Add(LockTxId, TRowVersion::Min());
+ } else if (VolatileTxId) {
+ // We want volatile changes to be visible at the write vrsion
+ ptr->Add(VolatileTxId, WriteVersion);
+ for (ui64 commitTxId : VolatileCommitTxIds) {
+ ptr->Add(commitTxId, WriteVersion);
+ }
+ }
}
return ptr;
}
NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override {
+ // TODO: handle volatile transactions
if (TSysTables::IsSystemTable(tableId) || !LockTxId)
return nullptr;
@@ -522,7 +588,7 @@ public:
return nullptr;
}
- auto& ptr = TxObservers[tableId];
+ auto& ptr = TxObservers[tableId.PathId];
if (!ptr) {
// This observer is supposed to find conflicts
ptr = new TReadTxObserver(this, tableId);
@@ -558,6 +624,7 @@ public:
}
void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override {
+ // TODO: handle volatile read dependencies
Host->CheckReadConflict(TableId, rowVersion);
}
@@ -644,6 +711,8 @@ public:
}
void OnSkipUncommitted(ui64 txId) override {
+ // TODO: we need to somehow remember that this lock must add a
+ // dependency to some other volatile tx at commit time.
if (!Host->Db.HasRemovedTx(LocalTid, txId)) {
++SkipCount;
if (!SelfFound) {
@@ -688,6 +757,9 @@ public:
}
void OnSkipUncommitted(ui64 txId) override {
+ // TODO: handle volatile write dependencies
+ // Note that all active volatile transactions will be uncommitted
+ // here, since we are not using tx map for conflict detection.
Host->BreakWriteConflict(txId);
}
@@ -734,10 +806,13 @@ private:
TInstant Now;
TRowVersion WriteVersion = TRowVersion::Max();
TRowVersion ReadVersion = TRowVersion::Min();
- THashSet<ui64> CommittedLockChanges;
- mutable THashMap<TTableId, THolder<IDataShardChangeCollector>> ChangeCollectors;
- mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps;
- mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers;
+ ui64 VolatileTxId = 0;
+ absl::flat_hash_set<ui64> CommittedLockChanges;
+ mutable absl::flat_hash_map<TPathId, THolder<IDataShardChangeCollector>> ChangeCollectors;
+ mutable absl::flat_hash_map<TPathId, TIntrusivePtr<NTable::TDynamicTransactionMap>> TxMaps;
+ mutable absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers;
+ mutable absl::flat_hash_set<ui64> VolatileCommitTxIds;
+ mutable absl::flat_hash_set<ui64> VolatileDependencies;
};
//
@@ -908,6 +983,13 @@ void TEngineBay::SetReadVersion(TRowVersion readVersion) {
ComputeCtx->SetReadVersion(readVersion);
}
+void TEngineBay::SetVolatileTxId(ui64 txId) {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ host->SetVolatileTxId(txId);
+}
+
void TEngineBay::SetIsImmediateTx() {
Y_VERIFY(EngineHost);
@@ -941,6 +1023,20 @@ void TEngineBay::ResetCollectedChanges() {
host->ResetCollectedChanges();
}
+TVector<ui64> TEngineBay::GetVolatileCommitTxIds() const {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ return host->GetVolatileCommitTxIds();
+}
+
+TVector<ui64> TEngineBay::GetVolatileDependencies() const {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ return host->GetVolatileDependencies();
+}
+
IEngineFlat * TEngineBay::GetEngine() {
if (!Engine) {
Engine = CreateEngineFlat(*EngineSettings);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 9d8ac1ba6f3..6a4b877b7d2 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -99,6 +99,7 @@ public:
void SetWriteVersion(TRowVersion writeVersion);
void SetReadVersion(TRowVersion readVersion);
+ void SetVolatileTxId(ui64 txId);
void SetIsImmediateTx();
void SetIsRepeatableSnapshot();
@@ -107,6 +108,9 @@ public:
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const;
void ResetCollectedChanges();
+ TVector<ui64> GetVolatileCommitTxIds() const;
+ TVector<ui64> GetVolatileDependencies() const;
+
void ResetCounters() { EngineHostCounters = TEngineHostCounters(); }
const TEngineHostCounters& GetCounters() const { return EngineHostCounters; }
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index 626b9afa5b9..fac931dff73 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -171,6 +171,8 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
PRECHARGE_SYS_TABLE(Schema::Locks);
PRECHARGE_SYS_TABLE(Schema::LockRanges);
PRECHARGE_SYS_TABLE(Schema::LockConflicts);
+ PRECHARGE_SYS_TABLE(Schema::TxVolatileDetails);
+ PRECHARGE_SYS_TABLE(Schema::TxVolatileParticipants);
if (!ready)
return false;
@@ -514,6 +516,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}
+ if (Self->State != TShardState::Offline) {
+ if (!Self->VolatileTxManager.Load(db)) {
+ return false;
+ }
+ }
+
Self->SubscribeNewLocks();
Self->ScheduleRemoveAbandonedLockChanges();
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 70f041a9787..4ac1ac3082f 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -761,6 +761,25 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded)
}
plan.push_back(EExecutionUnitKind::FinishPropose);
plan.push_back(EExecutionUnitKind::CompletedOperations);
+ } else if (HasVolatilePrepareFlag()) {
+ Y_VERIFY(!loaded);
+ plan.push_back(EExecutionUnitKind::CheckDataTx);
+ plan.push_back(EExecutionUnitKind::StoreDataTx); // note: stores in memory
+ plan.push_back(EExecutionUnitKind::FinishPropose);
+ Y_VERIFY(!GetStep());
+ plan.push_back(EExecutionUnitKind::WaitForPlan);
+ plan.push_back(EExecutionUnitKind::PlanQueue);
+ plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory
+ plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
+ Y_VERIFY(IsKqpDataTransaction());
+ // Note: execute will also prepare and send readsets
+ plan.push_back(EExecutionUnitKind::ExecuteKqpDataTx);
+ // Note: it is important that plan here is the same as regular
+ // distributed tx, since normal tx may decide to commit in a
+ // volatile manner with dependencies, to avoid waiting for
+ // locked keys to resolve.
+ plan.push_back(EExecutionUnitKind::CompleteOperation);
+ plan.push_back(EExecutionUnitKind::CompletedOperations);
} else {
if (!loaded) {
plan.push_back(EExecutionUnitKind::CheckDataTx);
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 8a1c715d75e..6acc43e9afb 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -179,6 +179,7 @@ public:
void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
+ void SetVolatileTxId(ui64 txId) { EngineBay.SetVolatileTxId(txId); }
void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) {
EngineBay.CommitChanges(tableId, lockId, writeVersion, txc);
@@ -187,6 +188,9 @@ public:
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); }
void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); }
+ TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); }
+ TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); }
+
TActorId Source() const { return Source_; }
void SetSource(const TActorId& actorId) { Source_ = actorId; }
void SetStep(ui64 step) { StepTxId_.Step = step; }
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 04bdf2a5afc..f23838a66ad 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -18,6 +18,7 @@
#include "change_record.h"
#include "progress_queue.h"
#include "read_iterator.h"
+#include "volatile_tx.h"
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
@@ -258,6 +259,7 @@ class TDataShard
friend class NMiniKQL::TKqpScanComputeContext;
friend class TSnapshotManager;
friend class TSchemaSnapshotManager;
+ friend class TVolatileTxManager;
friend class TReplicationSourceOffsetsClient;
friend class TReplicationSourceOffsetsServer;
@@ -870,13 +872,44 @@ class TDataShard
>;
};
+ // Describes a volatile transaction that executed and possibly made
+ // some changes, but is not fully committed, waiting for decision from
+ // other participants. Transaction would be committed at Step:TxId on
+ // success, and usually has 1-2 uncommitted TxIds that need to be
+ // committed.
+ struct TxVolatileDetails : Table<32> {
+ // Volatile TxId of the transaction
+ struct TxId : Column<1, NScheme::NTypeIds::Uint64> {};
+ // State of transaction, initially undecided, but becomes committed or aborted until it is removed
+ struct State : Column<2, NScheme::NTypeIds::Uint32> { using Type = EVolatileTxState; };
+ // Transaction details encoded in a protobuf message
+ struct Details : Column<3, NScheme::NTypeIds::String> { using Type = NKikimrTxDataShard::TTxVolatileDetails; };
+
+ using TKey = TableKey<TxId>;
+ using TColumns = TableColumns<TxId, State, Details>;
+ };
+
+ // Associated participants for a volatile transaction that need to
+ // decide a transaction and from which a readset is expected. Usually a
+ // COMMIT decision from a participant causes removal of a corresponding
+ // row, and ABORT decision causes full transaction abort, with removal
+ // of all corresponding rows.
+ struct TxVolatileParticipants : Table<33> {
+ struct TxId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct ShardId : Column<2, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<TxId, ShardId>;
+ using TColumns = TableColumns<TxId, ShardId>;
+ };
+
using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue,
DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress,
Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts,
SrcChangeSenderActivations, DstChangeSenderActivations,
ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived,
UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts,
- LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits>;
+ LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits,
+ TxVolatileDetails, TxVolatileParticipants>;
// These settings are persisted on each Init. So we use empty settings in order not to overwrite what
// was changed by the user
@@ -1614,6 +1647,10 @@ public:
void AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId,
TTransactionContext& txc, const TActorContext& ctx);
+ TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; }
+ const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; }
+
+
template <typename... Args>
bool PromoteCompleteEdge(Args&&... args) {
return SnapshotManager.PromoteCompleteEdge(std::forward<Args>(args)...);
@@ -2235,6 +2272,7 @@ private:
TSnapshotManager SnapshotManager;
TSchemaSnapshotManager SchemaSnapshotManager;
+ TVolatileTxManager VolatileTxManager;
TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index c0188834c50..08c734b9621 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -660,7 +660,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
- if (kqpTx.HasLocks() && !NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
+ if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
return true;
}
@@ -730,6 +730,132 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks)
return true;
}
+bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
+ auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+
+ if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
+ return true;
+ }
+
+ // Volatile transactions cannot work with non-generic readsets
+ YQL_ENSURE(kqpTx.GetUseGenericReadSets());
+
+ // We may have some stale data since before the restart
+ tx->OutReadSets().clear();
+ tx->AwaitingDecisions().clear();
+
+ // Note: usually all shards send locks, since they either have side effects or need to validate locks
+ // However it is technically possible to have pure-read shards, that don't contribute to the final decision
+ bool sendLocks = SendLocks(kqpTx.GetLocks(), origin);
+ if (sendLocks) {
+ // Note: it is possible to have no locks
+ auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin);
+
+ if (!brokenLocks.empty()) {
+ tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
+ NKikimrTxDataShard::TX_KIND_DATA,
+ origin,
+ tx->GetTxId(),
+ NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
+
+ auto* protoLocks = tx->Result()->Record.MutableTxLocks();
+ for (auto& brokenLock : brokenLocks) {
+ protoLocks->Add()->Swap(&brokenLock);
+ }
+
+ return false;
+ }
+
+ // We need to form decision readsets for all other participants
+ for (ui64 dstTabletId : kqpTx.GetLocks().GetReceivingShards()) {
+ if (dstTabletId == origin) {
+ // Don't send readsets to ourselves
+ continue;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Send commit decision from "
+ << origin << " to " << dstTabletId);
+
+ auto key = std::make_pair(origin, dstTabletId);
+ NKikimrTx::TReadSetData data;
+ data.SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT);
+
+ TString bodyStr;
+ bool ok = data.SerializeToString(&bodyStr);
+ Y_VERIFY(ok, "Failed to serialize readset from %" PRIu64 " to %" PRIu64, key.first, key.second);
+
+ tx->OutReadSets()[key] = std::move(bodyStr);
+ }
+ }
+
+ bool receiveLocks = ReceiveLocks(kqpTx.GetLocks(), origin);
+ if (receiveLocks) {
+ // Note: usually only shards with side-effects receive locks, since they
+ // need the final outcome to decide whether to commit or abort.
+ for (ui64 srcTabletId : kqpTx.GetLocks().GetSendingShards()) {
+ if (srcTabletId == origin) {
+ // Don't await decision from ourselves
+ continue;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Will wait for volatile decision from "
+ << srcTabletId << " to " << origin);
+
+ tx->AwaitingDecisions().insert(srcTabletId);
+ }
+
+ bool aborted = false;
+
+ for (auto& record : tx->DelayedInReadSets()) {
+ ui64 srcTabletId = record.GetTabletSource();
+ ui64 dstTabletId = record.GetTabletDest();
+ if (!tx->AwaitingDecisions().contains(srcTabletId) || dstTabletId != origin) {
+ // Don't process unexpected readsets
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Ignoring unexpected readset from "
+ << srcTabletId << " to " << dstTabletId << " for txId# " << tx->GetTxId() << " at tablet " << origin);
+ continue;
+ }
+
+ if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
+ // No readset data: participant aborted the transaction
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed no data readset from"
+ << srcTabletId << " to " << dstTabletId << " will abort txId# " << tx->GetTxId());
+ aborted = true;
+ break;
+ }
+
+ NKikimrTx::TReadSetData data;
+ bool ok = data.ParseFromString(record.GetReadSet());
+ Y_VERIFY(ok, "Failed to parse readset from %" PRIu64 " to %" PRIu64, srcTabletId, dstTabletId);
+
+ if (data.GetDecision() != NKikimrTx::TReadSetData::DECISION_COMMIT) {
+ // Explicit decision that is not a commit, need to abort
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed decision "
+ << ui32(data.GetDecision()) << " from " << srcTabletId << " to " << dstTabletId
+ << " for txId# " << tx->GetTxId());
+ aborted = true;
+ break;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed commit decision from "
+ << srcTabletId << " to " << dstTabletId << " for txId# " << tx->GetTxId());
+ tx->AwaitingDecisions().erase(srcTabletId);
+ }
+
+ if (aborted) {
+ tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
+ NKikimrTxDataShard::TX_KIND_DATA,
+ origin,
+ tx->GetTxId(),
+ NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED);
+
+ return false;
+ }
+ }
+
+ return true;
+}
+
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 52086ea0f70..458a103f912 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -35,6 +35,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId);
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
+bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc);
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
index bb2a1ad6ef8..6e57ee9bec4 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.cpp
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -110,11 +110,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
}
void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
- for (auto& pr : Self.GetUserTables()) {
- auto tid = pr.second->LocalTid;
- // Removing the lock also removes any uncommitted data
- if (DB.HasOpenTx(tid, lockId)) {
- DB.RemoveTx(tid, lockId);
+ // We remove lock changes unless it's managed by volatile tx manager
+ if (!Self.GetVolatileTxManager().FindByCommitTxId(lockId)) {
+ for (auto& pr : Self.GetUserTables()) {
+ auto tid = pr.second->LocalTid;
+ // Removing the lock also removes any uncommitted data
+ if (DB.HasOpenTx(tid, lockId)) {
+ DB.RemoveTx(tid, lockId);
+ }
}
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 6f95c80acad..c30648109ef 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1188,7 +1188,18 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};
+ auto badRequest = [&](const TString& error) {
+ tx->SetAbortedFlag();
+ tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(
+ rec.GetTxKind(), Self->TabletID(), tx->GetTxId(), NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST));
+ tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT, error);
+
+ LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
+ };
+
if (tx->IsSchemeTx()) {
+ Y_VERIFY(!tx->HasVolatilePrepareFlag(), "Volatile scheme transactions not supported");
+
Y_VERIFY(rec.HasSchemeShardId());
Y_VERIFY(rec.HasProcessingParams());
@@ -1216,6 +1227,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
return tx;
}
+ if (tx->HasVolatilePrepareFlag()) {
+ badRequest("Unsupported volatile snapshot tx");
+ return tx;
+ }
+
if (tx->GetSnapshotTx().HasCreateVolatileSnapshot()) {
tx->SetReadOnlyFlag();
tx->SetGlobalReaderFlag();
@@ -1231,6 +1247,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
return tx;
}
+ if (tx->HasVolatilePrepareFlag()) {
+ badRequest("Unsupported volatile distributed erase");
+ return tx;
+ }
+
tx->SetGlobalWriterFlag();
if (tx->GetDistributedEraseTx()->HasDependents()) {
tx->SetGlobalReaderFlag();
@@ -1241,6 +1262,11 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
return tx;
}
+ if (tx->HasVolatilePrepareFlag()) {
+ badRequest("Unsupported volatile commit writes");
+ return tx;
+ }
+
tx->SetGlobalWriterFlag();
} else {
Y_VERIFY(tx->IsReadTable() || tx->IsDataTx());
@@ -1277,6 +1303,35 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
tx->SetGlobalReaderFlag();
}
+ // Additional checks for volatile transactions
+ if (tx->HasVolatilePrepareFlag()) {
+ if (tx->HasImmediateFlag()) {
+ badRequest(TStringBuilder()
+ << "Volatile distributed tx " << tx->GetTxId()
+ << " at tablet " << Self->TabletID()
+ << " cannot be immediate");
+ return tx;
+ }
+
+ if (!dataTx->IsKqpDataTx()) {
+ badRequest(TStringBuilder()
+ << "Volatile distributed tx " << tx->GetTxId()
+ << " at tablet " << Self->TabletID()
+ << " must be a kqp data tx");
+ return tx;
+ }
+
+ if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
+ badRequest(TStringBuilder()
+ << "Volatile distributed tx " << tx->GetTxId()
+ << " at tablet " << Self->TabletID()
+ << " cannot have persistent channels");
+ return tx;
+ }
+
+ Y_VERIFY(!tx->IsImmediate(), "Sanity check failed: volatile tx cannot be immediate");
+ }
+
// Make config checks for immediate tx.
if (tx->IsImmediate()) {
if (Config.NoImmediate() || (Config.ForceOnlineRW() && !dataTx->ReadOnly())) {
@@ -1310,26 +1365,10 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
if (!tx->IsMvccSnapshotRead()) {
// No op
} else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) {
- tx->SetAbortedFlag();
- TString err = "Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction";
- tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(),
- Self->TabletID(),
- tx->GetTxId(),
- NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST));
- tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
- LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err);
-
+ badRequest("Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction");
return tx;
} else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) {
- tx->SetAbortedFlag();
- TString err = "Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction";
- tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(),
- Self->TabletID(),
- tx->GetTxId(),
- NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST));
- tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT,err);
- LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err);
-
+ badRequest("Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction");
return tx;
}
@@ -1350,15 +1389,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
};
if(tx->IsMvccSnapshotRead() && !allowSnapshot()) {
- tx->SetAbortedFlag();
- TString err = "Snapshot read must be an immediate read only or locked write transaction";
- tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(),
- Self->TabletID(),
- tx->GetTxId(),
- NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST));
- tx->Result()->SetProcessError(NKikimrTxDataShard::TError::BAD_ARGUMENT,err);
- LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, err);
-
+ badRequest("Snapshot read must be an immediate read-only or locked-write transaction");
return tx;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 0258c9c2e3b..efec2da9e31 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -33,23 +33,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
- auto forceVolatile = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
- switch (ev->GetTypeRewrite()) {
- case TEvDataShard::TEvProposeTransaction::EventType: {
- auto* msg = ev->Get<TEvDataShard::TEvProposeTransaction>();
- auto flags = msg->Record.GetFlags();
- if (!(flags & TTxFlags::Immediate)) {
- Cerr << "... forcing propose to use volatile prepare" << Endl;
- flags |= TTxFlags::VolatilePrepare;
- msg->Record.SetFlags(flags);
- }
- break;
- }
- }
- return TTestActorRuntimeBase::EEventAction::PROCESS;
- };
- auto prevObserverFunc = runtime.SetObserverFunc(forceVolatile);
-
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
Cerr << "!!! distributed write start" << Endl;
@@ -59,8 +43,24 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
)");
Cerr << "!!! distributed write end" << Endl;
- runtime.SetObserverFunc(prevObserverFunc);
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } }");
+
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ RebootTablet(runtime, shards1.at(0), sender);
+ // We should see same results after restart
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table-1`
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index d211051c9ab..54365b8bef2 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -166,7 +166,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
}
- if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) {
+ const bool validated = op->HasVolatilePrepareFlag()
+ ? KqpValidateVolatileTx(tabletId, tx, DataShard.SysLocksTable())
+ : KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable());
+
+ if (!validated) {
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
DataShard.SysLocksTable().ApplyLocks();
DataShard.SubscribeNewLocks(ctx);
@@ -199,6 +203,10 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->SetReadVersion(readVersion);
dataTx->SetWriteVersion(writeVersion);
+ if (op->HasVolatilePrepareFlag()) {
+ dataTx->SetVolatileTxId(tx->GetTxId());
+ }
+
KqpCommitLocks(tabletId, tx, writeVersion, DataShard, txc);
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
@@ -272,15 +280,33 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
DataShard.SysLocksTable().BreakSetLocks();
}
+ // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
+ // Such transactions would have no participants and become immediately committed
+ auto commitTxIds = dataTx->GetVolatileCommitTxIds();
+ if (commitTxIds) {
+ TVector<ui64> participants(tx->AwaitingDecisions().begin(), tx->AwaitingDecisions().end());
+ DataShard.GetVolatileTxManager().PersistAddVolatileTx(
+ tx->GetTxId(),
+ writeVersion,
+ commitTxIds,
+ dataTx->GetVolatileDependencies(),
+ participants,
+ txc);
+ }
+
+ if (op->HasVolatilePrepareFlag() && !op->OutReadSets().empty()) {
+ DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx);
+ }
+
+ // Note: may erase persistent locks, must be after we persist volatile tx
AddLocksToResult(op, ctx);
- auto changes = std::move(dataTx->GetCollectedChanges());
- if (guardLocks.LockTxId) {
- if (changes) {
- DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes));
+ if (auto changes = std::move(dataTx->GetCollectedChanges())) {
+ if (commitTxIds || guardLocks.LockTxId) {
+ DataShard.AddLockChangeRecords(commitTxIds ? tx->GetTxId() : guardLocks.LockTxId, std::move(changes));
+ } else {
+ op->ChangeRecords() = std::move(changes);
}
- } else {
- op->ChangeRecords() = std::move(changes);
}
KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters());
@@ -351,12 +377,14 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds());
op->ResetCurrentTimer();
- if (op->IsReadOnly() && !locksDb.HasChanges()) {
+ if (txc.DB.HasChanges()) {
+ op->SetWaitCompletionFlag(true);
+ } else if (op->IsReadOnly()) {
return EExecutionStatus::Executed;
}
- if (locksDb.HasChanges()) {
- op->SetWaitCompletionFlag(true);
+ if (op->HasVolatilePrepareFlag() && !op->PreparedOutReadSets().empty()) {
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
return EExecutionStatus::ExecutedNoMoreRestarts;
@@ -395,7 +423,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx,
return EExecutionStatus::Restart;
}
-void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {}
+void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr op, const TActorContext& ctx) {
+ if (op->HasVolatilePrepareFlag() && !op->PreparedOutReadSets().empty()) {
+ DataShard.SendReadSets(ctx, std::move(op->PreparedOutReadSets()));
+ }
+}
THolder<TExecutionUnit> CreateExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) {
return THolder(new TExecuteKqpDataTxUnit(dataShard, pipeline));
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index fb182f810ca..ca43b25cabe 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -14,6 +14,7 @@
#include <ydb/core/tx/balance_coverage/balance_coverage_builder.h>
#include <ydb/core/tx/tx_processing.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
#include <library/cpp/containers/flat_hash/flat_hash.h>
#include <util/generic/hash.h>
@@ -419,6 +420,7 @@ struct TInputOpData {
using TSnapshots = TVector<TIntrusivePtr<TTableSnapshotContext>>;
using TInReadSets = TMap<std::pair<ui64, ui64>, TVector<TRSData>>;
using TCoverageBuilders = TMap<std::pair<ui64, ui64>, std::shared_ptr<TBalanceCoverageBuilder>>;
+ using TAwaitingDecisions = absl::flat_hash_set<ui64>;
TInputOpData()
: RemainReadSets(0)
@@ -435,6 +437,7 @@ struct TInputOpData {
ui32 RemainReadSets;
TAutoPtr<IDestructable> ScanResult;
TAutoPtr<IDestructable> AsyncJobResult;
+ TAwaitingDecisions AwaitingDecisions;
};
struct TOutputOpData {
@@ -590,6 +593,8 @@ public:
void SetAsyncJobResult(TAutoPtr<IDestructable> prod) { InputDataRef().AsyncJobResult = prod; }
bool HasAsyncJobResult() const { return InputData ? (bool)InputData->AsyncJobResult : false; }
+ TInputOpData::TAwaitingDecisions &AwaitingDecisions() { return InputDataRef().AwaitingDecisions; }
+
////////////////////////////////////////
// OUTPUT DATA //
////////////////////////////////////////
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
new file mode 100644
index 00000000000..06ecb6c927f
--- /dev/null
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -0,0 +1,281 @@
+#include "volatile_tx.h"
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+ void TVolatileTxManager::TTxMap::Add(ui64 txId, TRowVersion version) {
+ Map[txId] = version;
+ }
+
+ void TVolatileTxManager::TTxMap::Remove(ui64 txId) {
+ Map.erase(txId);
+ }
+
+ const TRowVersion* TVolatileTxManager::TTxMap::Find(ui64 txId) const {
+ auto it = Map.find(txId);
+ if (it != Map.end()) {
+ return &it->second;
+ }
+ return nullptr;
+ }
+
+ void TVolatileTxManager::Clear() {
+ VolatileTxs.clear();
+ VolatileTxByCommitTxId.clear();
+ TxMap.Reset();
+ }
+
+ bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) {
+ using Schema = TDataShard::Schema;
+
+ Y_VERIFY(VolatileTxs.empty() && VolatileTxByCommitTxId.empty() && !TxMap,
+ "Unexpected Load into non-empty volatile tx manager");
+
+ // Tables may not exist in some inactive shards, which cannot have transactions
+ if (db.HaveTable<Schema::TxVolatileDetails>() &&
+ db.HaveTable<Schema::TxVolatileParticipants>())
+ {
+ if (!LoadTxDetails(db)) {
+ return false;
+ }
+ if (!LoadTxParticipants(db)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool TVolatileTxManager::LoadTxDetails(NIceDb::TNiceDb& db) {
+ using Schema = TDataShard::Schema;
+
+ auto rowset = db.Table<Schema::TxVolatileDetails>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ ui64 txId = rowset.GetValue<Schema::TxVolatileDetails::TxId>();
+ EVolatileTxState state = rowset.GetValue<Schema::TxVolatileDetails::State>();
+ auto details = rowset.GetValue<Schema::TxVolatileDetails::Details>();
+
+ Y_VERIFY_S(txId == details.GetTxId(),
+ "Volatile txId# " << txId << " has unexpected details with txId# " << details.GetTxId());
+
+ auto res = VolatileTxs.insert(
+ std::make_pair(txId, std::make_unique<TVolatileTxInfo>()));
+ Y_VERIFY_S(res.second, "Unexpected duplicate volatile txId# " << txId);
+
+ auto* info = res.first->second.get();
+ info->TxId = txId;
+ info->State = state;
+ info->Version = TRowVersion(details.GetVersionStep(), details.GetVersionTxId());
+ info->CommitTxIds.insert(details.GetCommitTxIds().begin(), details.GetCommitTxIds().end());
+ info->Dependencies.insert(details.GetDependencies().begin(), details.GetDependencies().end());
+ info->AddCommitted = true; // we loaded it from local db, so it is committed
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ auto postProcessTxInfo = [this](TVolatileTxInfo* info) {
+ switch (info->State) {
+ case EVolatileTxState::Waiting:
+ case EVolatileTxState::Committed: {
+ if (!TxMap) {
+ TxMap = MakeIntrusive<TTxMap>();
+ }
+
+ // Waiting and Committed transactions need to be added to TxMap until they are fully resolved
+ // Note that aborted transactions are removed from TxMap and don't need to be re-added
+ for (ui64 commitTxId : info->CommitTxIds) {
+ auto res2 = VolatileTxByCommitTxId.emplace(commitTxId, info);
+ Y_VERIFY_S(res2.second, "Unexpected duplicate commitTxId# " << commitTxId);
+ TxMap->Add(commitTxId, info->Version);
+ }
+
+ for (auto it = info->Dependencies.begin(); it != info->Dependencies.end(); /* nothing */) {
+ ui64 dependencyTxId = *it;
+ auto* dependency = FindByTxId(dependencyTxId);
+ if (!dependency || dependency->State == EVolatileTxState::Aborted) {
+ // Skip dependencies that have been aborted already
+ info->Dependencies.erase(it++);
+ continue;
+ }
+ dependency->Dependents.insert(info->TxId);
+ ++it;
+ }
+
+ if (info->Dependencies.empty() && info->State == EVolatileTxState::Committed) {
+ // TODO: committed transactions without dependencies are ready to fully commit
+ }
+
+ return;
+ }
+
+ case EVolatileTxState::Aborted: {
+ // Aborted transactions don't have dependencies
+ info->Dependencies.clear();
+
+ // TODO: aborted transactions are ready to be rolled back
+ return;
+ }
+ }
+
+ Y_VERIFY_S(false, "Unexpected volatile txId# " << info->TxId << " @" << info->Version << " with state# " << ui32(info->State));
+ };
+
+ for (auto& pr : VolatileTxs) {
+ postProcessTxInfo(pr.second.get());
+ }
+
+ return true;
+ }
+
+ bool TVolatileTxManager::LoadTxParticipants(NIceDb::TNiceDb& db) {
+ using Schema = TDataShard::Schema;
+
+ auto rowset = db.Table<Schema::TxVolatileParticipants>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ TVolatileTxInfo* lastInfo = nullptr;
+ while (!rowset.EndOfSet()) {
+ ui64 txId = rowset.GetValue<Schema::TxVolatileParticipants::TxId>();
+ ui64 shardId = rowset.GetValue<Schema::TxVolatileParticipants::ShardId>();
+
+ auto* info = (lastInfo && lastInfo->TxId == txId) ? lastInfo : FindByTxId(txId);
+ Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId);
+
+ // Only waiting transactions may have participants
+ Y_VERIFY_S(info->State == EVolatileTxState::Waiting,
+ "Unexpected volatile txId# " << txId << " with participant# " << shardId
+ << " in state# " << ui32(info->State));
+
+ info->Participants.insert(shardId);
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ for (auto& pr : VolatileTxs) {
+ auto* info = pr.second.get();
+
+ // Sanity check that are are no waiting transactions without participants
+ if (info->State == EVolatileTxState::Waiting) {
+ Y_VERIFY_S(!info->Participants.empty(),
+ "Unexpected waiting volatile txId# " << info->TxId << " without participants");
+ }
+ }
+
+ return true;
+ }
+
+ TVolatileTxInfo* TVolatileTxManager::FindByTxId(ui64 txId) const {
+ auto it = VolatileTxs.find(txId);
+ if (it != VolatileTxs.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+ }
+
+ TVolatileTxInfo* TVolatileTxManager::FindByCommitTxId(ui64 txId) const {
+ auto it = VolatileTxByCommitTxId.find(txId);
+ if (it != VolatileTxByCommitTxId.end()) {
+ return it->second;
+ }
+ return nullptr;
+ }
+
+ void TVolatileTxManager::PersistAddVolatileTx(
+ ui64 txId, const TRowVersion& version,
+ TConstArrayRef<ui64> commitTxIds,
+ TConstArrayRef<ui64> dependencies,
+ TConstArrayRef<ui64> participants,
+ TTransactionContext& txc)
+ {
+ using Schema = TDataShard::Schema;
+
+ Y_VERIFY_S(!commitTxIds.empty(),
+ "Unexpected volatile txId# " << txId << " @" << version << " without commits");
+
+ auto res = VolatileTxs.insert(
+ std::make_pair(txId, std::make_unique<TVolatileTxInfo>()));
+ Y_VERIFY_S(res.second, "Cannot add volatile txId# " << txId << " @" << version
+ << ": duplicate volatile tx @" << res.first->second->Version << " already exists");
+
+ auto* info = res.first->second.get();
+ info->TxId = txId;
+ info->Version = version;
+ info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end());
+ info->Dependencies.insert(dependencies.begin(), dependencies.end());
+ info->Participants.insert(participants.begin(), participants.end());
+
+ if (info->Participants.empty()) {
+ // Transaction is committed when we don't have to wait for other participants
+ info->State = EVolatileTxState::Committed;
+ }
+
+ if (!TxMap) {
+ TxMap = MakeIntrusive<TTxMap>();
+ }
+
+ for (ui64 commitTxId : commitTxIds) {
+ auto res2 = VolatileTxByCommitTxId.emplace(commitTxId, info);
+ Y_VERIFY_S(res2.second, "Cannot add volatile txId# " << txId << " @" << version << " with commitTxId# " << commitTxId
+ << ": already registered for txId# " << res.first->second->TxId << " @" << res.first->second->Version);
+ TxMap->Add(commitTxId, version);
+ }
+
+ for (ui64 dependencyTxId : info->Dependencies) {
+ auto* dependency = FindByTxId(dependencyTxId);
+ Y_VERIFY_S(dependency, "Cannot find dependency txId# " << dependencyTxId
+ << " for volatile txId# " << txId << " @" << version);
+ Y_VERIFY_S(dependency->State != EVolatileTxState::Aborted,
+ "Unexpected aborted dependency txId# " << dependencyTxId
+ << " for volatile txId# " << txId << " @" << version);
+ dependency->Dependents.insert(txId);
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ NKikimrTxDataShard::TTxVolatileDetails details;
+ details.SetTxId(txId);
+ details.SetVersionStep(version.Step);
+ details.SetVersionTxId(version.TxId);
+
+ if (!info->CommitTxIds.empty()) {
+ auto* m = details.MutableCommitTxIds();
+ m->Add(info->CommitTxIds.begin(), info->CommitTxIds.end());
+ std::sort(m->begin(), m->end());
+ }
+
+ if (!info->Dependencies.empty()) {
+ auto* m = details.MutableDependencies();
+ m->Add(info->Dependencies.begin(), info->Dependencies.end());
+ std::sort(m->begin(), m->end());
+ }
+
+ db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update(
+ NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State),
+ NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details)));
+ for (ui64 shardId : participants) {
+ db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update();
+ }
+
+ txc.OnCommit([this, txId]() {
+ auto* info = FindByTxId(txId);
+ Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId);
+ Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId);
+ info->AddCommitted = true;
+ // TODO: activate pending responses
+ });
+ txc.OnRollback([txId]() {
+ Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId);
+ });
+ }
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
new file mode 100644
index 00000000000..6c509a456af
--- /dev/null
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -0,0 +1,135 @@
+#pragma once
+#include <ydb/core/tablet_flat/flat_table_committed.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
+#include <util/generic/hash.h>
+
+namespace NKikimr::NTabletFlatExecutor {
+
+ class TTransactionContext;
+
+} // namespace NKikimr::NTabletFlatExecutor
+
+namespace NKikimr::NIceDb {
+
+ class TNiceDb;
+
+} // namespace NKikimr::NIceDb
+
+namespace NKikimr::NDataShard {
+
+ class TDataShard;
+
+ enum class EVolatileTxState : ui32 {
+ // Volatile transaction is waiting for decisions from other participants
+ Waiting = 0,
+ // Volatile transaction is logically committed, but not yet committed in storage
+ Committed = 1,
+ // Volatile transaction is aborted, waiting to be garbage collected
+ Aborted = 2,
+ };
+
+ struct TVolatileTxInfo {
+ ui64 TxId;
+ EVolatileTxState State = EVolatileTxState::Waiting;
+ TRowVersion Version;
+ absl::flat_hash_set<ui64> CommitTxIds;
+ absl::flat_hash_set<ui64> Dependencies;
+ absl::flat_hash_set<ui64> Dependents;
+ absl::flat_hash_set<ui64> Participants;
+ bool AddCommitted = false;
+ };
+
+ class TVolatileTxManager {
+ using TTransactionContext = NKikimr::NTabletFlatExecutor::TTransactionContext;
+
+ private:
+ /**
+ * A custom implementation of tx map without any copy-on-write support,
+ * instead the map is changed directly whenever the set of transaction
+ * changes, possibly affecting existing iterators. Since this tx map
+ * is never used for scans it should be safe to do.
+ */
+ class TTxMap final : public NTable::ITransactionMap {
+ public:
+ void Add(ui64 txId, TRowVersion version);
+ void Remove(ui64 txId);
+
+ bool Empty() const {
+ return Map.empty();
+ }
+
+ const TRowVersion* Find(ui64 txId) const override;
+
+ private:
+ absl::flat_hash_map<ui64, TRowVersion> Map;
+ };
+
+ /**
+ * A helper access class that automatically converts to the optimal
+ * executor argument type.
+ */
+ class TTxMapAccess {
+ public:
+ TTxMapAccess(const TIntrusivePtr<TTxMap>& txMap)
+ : TxMap(txMap)
+ { }
+
+ explicit operator bool() const {
+ return TxMap && !TxMap->Empty();
+ }
+
+ operator NTable::ITransactionMapPtr() const {
+ if (TxMap && !TxMap->Empty()) {
+ return TxMap;
+ } else {
+ return nullptr;
+ }
+ }
+
+ operator NTable::ITransactionMapSimplePtr() const {
+ if (TxMap && !TxMap->Empty()) {
+ return static_cast<NTable::ITransactionMap*>(TxMap.Get());
+ } else {
+ return nullptr;
+ }
+ }
+
+ private:
+ const TIntrusivePtr<TTxMap>& TxMap;
+ };
+
+ public:
+ TVolatileTxManager(TDataShard* self)
+ : Self(self)
+ { }
+
+ void Clear();
+ bool Load(NIceDb::TNiceDb& db);
+
+ TVolatileTxInfo* FindByTxId(ui64 txId) const;
+ TVolatileTxInfo* FindByCommitTxId(ui64 txId) const;
+
+ void PersistAddVolatileTx(
+ ui64 txId, const TRowVersion& version,
+ TConstArrayRef<ui64> commitTxIds,
+ TConstArrayRef<ui64> dependencies,
+ TConstArrayRef<ui64> participants,
+ TTransactionContext& txc);
+
+ TTxMapAccess GetTxMap() const {
+ return TTxMapAccess(TxMap);
+ }
+
+ private:
+ bool LoadTxDetails(NIceDb::TNiceDb& db);
+ bool LoadTxParticipants(NIceDb::TNiceDb& db);
+
+ private:
+ TDataShard* const Self;
+ absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
+ absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
+ TIntrusivePtr<TTxMap> TxMap;
+ };
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index ceb05783686..20d676d546b 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1725,6 +1725,95 @@
}
},
{
+ "TableId": 32,
+ "TableName": "TxVolatileDetails",
+ "TableKey": [
+ 1
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "TxId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "State",
+ "ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "Details",
+ "ColumnType": "String"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
+ "TableId": 33,
+ "TableName": "TxVolatileParticipants",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "TxId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "ShardId",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
"TableId": 101,
"TableName": "LockChangeRecords",
"TableKey": [