aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-12-06 16:37:57 +0300
committersnaury <snaury@ydb.tech>2022-12-06 16:37:57 +0300
commitf34f958708d7a10869e32bfbffb2cd4dd79cf1c1 (patch)
tree313c03535bd396e7d007d77880592a6a83a12c98
parent0ca041dc2b282b64373ae3197952331ceaa430e5 (diff)
downloadydb-f34f958708d7a10869e32bfbffb2cd4dd79cf1c1.tar.gz
Start introducing volatile distributed transactions
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/datashard.cpp53
-rw-r--r--ydb/core/tx/datashard/datashard.h5
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp26
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h3
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h13
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_trans_queue.cpp61
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp79
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/operation.cpp5
-rw-r--r--ydb/core/tx/datashard/operation.h13
-rw-r--r--ydb/core/tx/datashard/store_data_tx_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt56
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt58
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt60
-rw-r--r--ydb/core/tx/datashard/ut_volatile/CMakeLists.txt15
17 files changed, 449 insertions, 26 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 3101a8e04f0..5658b055b84 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -35,6 +35,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_testload)
add_subdirectory(ut_upload_rows)
+add_subdirectory(ut_volatile)
add_library(core-tx-datashard)
target_compile_options(core-tx-datashard PRIVATE
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 3ad5b986bb3..757ddf7068c 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1931,15 +1931,35 @@ void TDataShard::SendImmediateWriteResult(
}
}
-void TDataShard::SendImmediateReadResult(
- TMonotonic readTime,
+TMonotonic TDataShard::ConfirmReadOnlyLease() {
+ if (IsFollower() || !ReadOnlyLeaseEnabled()) {
+ // Do nothing and return an empty timestamp
+ return {};
+ }
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ Executor()->ConfirmReadOnlyLease(ts);
+ return ts;
+}
+
+void TDataShard::ConfirmReadOnlyLease(TMonotonic ts) {
+ if (IsFollower() || !ReadOnlyLeaseEnabled()) {
+ // Do nothing
+ return;
+ }
+
+ Executor()->ConfirmReadOnlyLease(ts);
+}
+
+void TDataShard::SendWithConfirmedReadOnlyLease(
+ TMonotonic ts,
const TActorId& target,
IEventBase* event,
ui64 cookie,
const TActorId& sessionId)
{
if (IsFollower() || !ReadOnlyLeaseEnabled()) {
- // We just send possibly stale result (old behavior)
+ // Send possibly stale result (legacy behavior)
if (!sessionId) {
Send(target, event, 0, cookie);
} else {
@@ -1962,23 +1982,42 @@ void TDataShard::SendImmediateReadResult(
}
};
- if (!readTime) {
- readTime = AppData()->MonotonicTimeProvider->Now();
+ if (!ts) {
+ ts = AppData()->MonotonicTimeProvider->Now();
}
- Executor()->ConfirmReadOnlyLease(readTime,
+ Executor()->ConfirmReadOnlyLease(ts,
[state = MakeIntrusive<TSendState>(sessionId, target, SelfId(), event, cookie)] {
TActivationContext::Send(state->Ev.Release());
});
}
+void TDataShard::SendWithConfirmedReadOnlyLease(
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie,
+ const TActorId& sessionId)
+{
+ SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId);
+}
+
+void TDataShard::SendImmediateReadResult(
+ TMonotonic readTime,
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie,
+ const TActorId& sessionId)
+{
+ SendWithConfirmedReadOnlyLease(readTime, target, event, cookie, sessionId);
+}
+
void TDataShard::SendImmediateReadResult(
const TActorId& target,
IEventBase* event,
ui64 cookie,
const TActorId& sessionId)
{
- SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie, sessionId);
+ SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId);
}
void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) {
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 1a3b91d68a2..a2237a97ca0 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -37,6 +37,11 @@ namespace NDataShard {
ForceOnline = 0x04,
Immediate = 0x08,
+ // Transaction must be prepared as a ditributed transaction, but
+ // must also be volatile, i.e. expect that other participants may
+ // cancel it even after it is planned.
+ VolatilePrepare = 0x10,
+
PublicFlagsMask = 0x000000000000FFFF,
//////////////////////////////
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 159d0b516f2..70f041a9787 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -348,7 +348,7 @@ TActiveTransaction::~TActiveTransaction()
void TActiveTransaction::FillTxData(TValidatedDataTx::TPtr dataTx)
{
Y_VERIFY(!DataTx);
- Y_VERIFY(TxBody.empty());
+ Y_VERIFY(TxBody.empty() || HasVolatilePrepareFlag());
Target = dataTx->Source();
DataTx = dataTx;
@@ -397,6 +397,30 @@ void TActiveTransaction::FillTxData(TDataShard *self,
TrackMemory();
}
+void TActiveTransaction::FillVolatileTxData(TDataShard *self,
+ TTransactionContext &txc,
+ const TActorContext &ctx)
+{
+ UntrackMemory();
+
+ Y_VERIFY(!DataTx);
+ Y_VERIFY(!TxBody.empty());
+
+ if (IsDataTx() || IsReadTable()) {
+ BuildDataTx(self, txc, ctx);
+ Y_VERIFY(DataTx->Ready());
+
+ if (DataTx->HasStreamResponse())
+ SetStreamSink(DataTx->GetSink());
+ } else if (IsSnapshotTx()) {
+ BuildSnapshotTx();
+ } else {
+ Y_FAIL("Unexpected FillVolatileTxData call");
+ }
+
+ TrackMemory();
+}
+
TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
TTransactionContext &txc,
const TActorContext &ctx)
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index ce629bb4b27..8a1c715d75e 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -354,6 +354,9 @@ public:
const TString &txBody,
const TVector<TSysTables::TLocksTable::TLock> &locks,
ui64 artifactFlags);
+ void FillVolatileTxData(TDataShard *self,
+ TTransactionContext &txc,
+ const TActorContext &ctx);
const TString &GetTxBody() const { return TxBody; }
void SetTxBody(const TString &txBody) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index b20bce5ed4b..04bdf2a5afc 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1645,6 +1645,19 @@ public:
ui64 GetMaxObservedStep() const;
void SendImmediateWriteResult(
const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ TMonotonic ConfirmReadOnlyLease();
+ void ConfirmReadOnlyLease(TMonotonic ts);
+ void SendWithConfirmedReadOnlyLease(
+ TMonotonic ts,
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie = 0,
+ const TActorId& sessionId = {});
+ void SendWithConfirmedReadOnlyLease(
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie = 0,
+ const TActorId& sessionId = {});
void SendImmediateReadResult(
TMonotonic readTime,
const TActorId& target,
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 13df5952495..6f95c80acad 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -531,6 +531,18 @@ bool TPipeline::LoadTxDetails(TTransactionContext &txc,
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"LoadTxDetails at " << Self->TabletID() << " got data tx from cache "
<< tx->GetStep() << ":" << tx->GetTxId());
+ } else if (tx->HasVolatilePrepareFlag()) {
+ // Since transaction is volatile it was never stored on disk, and it
+ // shouldn't have any artifacts yet.
+ tx->FillVolatileTxData(Self, txc, ctx);
+
+ ui32 keysCount = 0;
+ keysCount = tx->ExtractKeys();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "LoadTxDetails at " << Self->TabletID() << " loaded tx from memory "
+ << tx->GetStep() << ":" << tx->GetTxId() << " keys extracted: "
+ << keysCount);
} else {
NIceDb::TNiceDb db(txc.DB);
TActorId target;
@@ -993,7 +1005,9 @@ void TPipeline::ProposeTx(TOperation::TPtr op, const TStringBuf &txBody, TTransa
{
NIceDb::TNiceDb db(txc.DB);
SetProposed(op->GetTxId(), op->GetTarget());
- PreserveSchema(db, op->GetMaxStep());
+ if (!op->HasVolatilePrepareFlag()) {
+ PreserveSchema(db, op->GetMaxStep());
+ }
Self->TransQueue.ProposeTx(db, op, op->GetTarget(), txBody);
if (Self->IsStopping() && op->GetTarget()) {
// Send notification if we prepared a tx while shard was stopping
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp
index d7096c25b32..b6e07f742ca 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.cpp
+++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp
@@ -225,6 +225,12 @@ void TTransQueue::ProposeTx(NIceDb::TNiceDb& db, TOperation::TPtr op, TActorId s
const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask;
AddTxInFly(op);
+
+ if (op->HasVolatilePrepareFlag()) {
+ // We keep volatile transactions in memory and don't store anything
+ return;
+ }
+
db.Table<Schema::TxMain>().Key(op->GetTxId()).Update(
NIceDb::TUpdate<Schema::TxMain::Kind>(op->GetKind()),
NIceDb::TUpdate<Schema::TxMain::Flags>(op->GetFlags() & preserveFlagsMask),
@@ -245,7 +251,10 @@ void TTransQueue::ProposeTx(NIceDb::TNiceDb& db, TOperation::TPtr op, TActorId s
void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) {
using Schema = TDataShard::Schema;
- Y_VERIFY(TxsInFly.contains(txId));
+ auto it = TxsInFly.find(txId);
+ Y_VERIFY(it != TxsInFly.end());
+
+ Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxFlags for a volatile transaction");
const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask;
@@ -257,6 +266,11 @@ void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) {
void TTransQueue::UpdateTxBody(NIceDb::TNiceDb& db, ui64 txId, const TStringBuf& txBody) {
using Schema = TDataShard::Schema;
+ auto it = TxsInFly.find(txId);
+ Y_VERIFY(it != TxsInFly.end());
+
+ Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxBody for a volatile transaction");
+
db.Table<Schema::TxDetails>().Key(txId, Self->TabletID())
.Update<Schema::TxDetails::Body>(TString(txBody));
}
@@ -265,12 +279,15 @@ void TTransQueue::RemoveTx(NIceDb::TNiceDb &db,
const TOperation &op) {
using Schema = TDataShard::Schema;
- db.Table<Schema::TxMain>().Key(op.GetTxId()).Delete();
- db.Table<Schema::TxDetails>().Key(op.GetTxId(), Self->TabletID()).Delete();
- db.Table<Schema::TxArtifacts>().Key(op.GetTxId()).Delete();
+ if (!op.HasVolatilePrepareFlag()) {
+ db.Table<Schema::TxMain>().Key(op.GetTxId()).Delete();
+ db.Table<Schema::TxDetails>().Key(op.GetTxId(), Self->TabletID()).Delete();
+ db.Table<Schema::TxArtifacts>().Key(op.GetTxId()).Delete();
+
+ db.Table<Schema::PlanQueue>().Key(op.GetStep(), op.GetTxId()).Delete();
+ db.Table<Schema::DeadlineQueue>().Key(op.GetMaxStep(), op.GetTxId()).Delete();
+ }
- db.Table<Schema::PlanQueue>().Key(op.GetStep(), op.GetTxId()).Delete();
- db.Table<Schema::DeadlineQueue>().Key(op.GetMaxStep(), op.GetTxId()).Delete();
DeadlineQueue.erase(std::make_pair(op.GetMaxStep(), op.GetTxId()));
RemoveTxInFly(op.GetTxId());
PlannedTxs.erase(op.GetStepOrder());
@@ -321,6 +338,11 @@ bool TTransQueue::LoadTxDetails(NIceDb::TNiceDb &db,
ui64 &artifactFlags) {
using Schema = TDataShard::Schema;
+ auto it = TxsInFly.find(txId);
+ Y_VERIFY(it != TxsInFly.end());
+
+ Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected LoadTxDetails for a volatile transaction");
+
auto detailsRow = db.Table<Schema::TxDetails>().Key(txId, Self->TabletID()).Select();
auto artifactsRow = db.Table<Schema::TxArtifacts>().Key(txId).Select();
@@ -370,11 +392,14 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) {
ui64 maxStep = it->second->GetMaxStep();
- if (!ClearTxDetails(db, txId))
- return false;
+ if (!it->second->HasVolatilePrepareFlag()) {
+ if (!ClearTxDetails(db, txId)) {
+ return false;
+ }
- db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete();
- db.Table<Schema::TxMain>().Key(txId).Delete();
+ db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete();
+ db.Table<Schema::TxMain>().Key(txId).Delete();
+ }
DeadlineQueue.erase(std::make_pair(maxStep, txId));
RemoveTxInFly(txId);
@@ -401,11 +426,15 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Cleaning up tx " << txId << " with maxStep " << maxStep << " at outdatedStep " << outdatedStep);
- if (!ClearTxDetails(db, txId))
- return ECleanupStatus::Restart;
+ auto it = TxsInFly.find(txId);
+ if (it != TxsInFly.end() && !it->second->HasVolatilePrepareFlag()) {
+ if (!ClearTxDetails(db, txId)) {
+ return ECleanupStatus::Restart;
+ }
- db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete();
- db.Table<Schema::TxMain>().Key(txId).Delete();
+ db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete();
+ db.Table<Schema::TxMain>().Key(txId).Delete();
+ }
erasedDeadlines.insert(pr);
outdatedTxs.push_back(txId);
@@ -444,7 +473,9 @@ void TTransQueue::PlanTx(TOperation::TPtr op,
}
using Schema = TDataShard::Schema;
- db.Table<Schema::PlanQueue>().Key(step, op->GetTxId()).Update();
+ if (!op->HasVolatilePrepareFlag()) {
+ db.Table<Schema::PlanQueue>().Key(step, op->GetTxId()).Update();
+ }
PlannedTxs.emplace(op->GetStepOrder());
PlannedTxsByKind[op->GetKind()].emplace(op->GetStepOrder());
DeadlineQueue.erase(std::make_pair(op->GetMaxStep(), op->GetTxId()));
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
new file mode 100644
index 00000000000..0258c9c2e3b
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -0,0 +1,79 @@
+#include "datashard_ut_common.h"
+#include "datashard_ut_common_kqp.h"
+#include "datashard_active_transaction.h"
+
+namespace NKikimr {
+
+using namespace NKikimr::NDataShard;
+using namespace NKikimr::NDataShard::NKqpHelpers;
+using namespace NSchemeShard;
+using namespace Tests;
+
+Y_UNIT_TEST_SUITE(DataShardVolatile) {
+
+ Y_UNIT_TEST(DistributedWrite) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ auto forceVolatile = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvProposeTransaction>();
+ auto flags = msg->Record.GetFlags();
+ if (!(flags & TTxFlags::Immediate)) {
+ Cerr << "... forcing propose to use volatile prepare" << Endl;
+ flags |= TTxFlags::VolatilePrepare;
+ msg->Record.SetFlags(flags);
+ }
+ break;
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(forceVolatile);
+
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
+
+ Cerr << "!!! distributed write start" << Endl;
+ ExecSQL(server, sender, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )");
+ Cerr << "!!! distributed write end" << Endl;
+
+ runtime.SetObserverFunc(prevObserverFunc);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } }");
+ }
+
+} // Y_UNIT_TEST_SUITE(DataShardVolatile)
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index d4689228535..812b0796ddb 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -91,6 +91,10 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
}
}
+ if (op->HasVolatilePrepareFlag() && !op->HasResultSentFlag() && !op->IsDirty()) {
+ op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
+ }
+
if (!op->HasResultSentFlag() && (op->IsDirty() || !Pipeline.WaitCompletion(op)))
CompleteRequest(op, ctx);
@@ -181,6 +185,8 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
} else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) {
// TODO: we should actually measure a read timestamp and use it here
DataShard.SendImmediateReadResult(op->GetTarget(), res.Release(), op->GetCookie());
+ } else if (op->HasVolatilePrepareFlag() && !op->IsDirty()) {
+ DataShard.SendWithConfirmedReadOnlyLease(op->GetFinishProposeTs(), op->GetTarget(), res.Release(), op->GetCookie());
} else {
ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie());
}
diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp
index 37c14b9b982..d681acf292e 100644
--- a/ydb/core/tx/datashard/operation.cpp
+++ b/ydb/core/tx/datashard/operation.cpp
@@ -270,5 +270,10 @@ bool TOperation::HasRuntimeConflicts() const noexcept
return !Dependencies.empty();
}
+void TOperation::SetFinishProposeTs() noexcept
+{
+ SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now());
+}
+
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 888cebdc5a1..fb182f810ca 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -196,7 +196,11 @@ public:
void SetImmediateFlag(bool val = true) { SetFlag(TTxFlags::Immediate, val); }
void ResetImmediateFlag() { ResetFlag(TTxFlags::Immediate); }
- bool IsImmediate() const { return HasImmediateFlag() && !HasForceOnlineFlag(); }
+ bool HasVolatilePrepareFlag() const { return HasFlag(TTxFlags::VolatilePrepare); }
+ void SetVolatilePrepareFlag(bool val = true) { SetFlag(TTxFlags::VolatilePrepare, val); }
+ void ResetVolatilePrepareFlag() { ResetFlag(TTxFlags::VolatilePrepare); }
+
+ bool IsImmediate() const { return HasImmediateFlag() && !HasForceOnlineFlag() && !HasVolatilePrepareFlag(); }
bool HasInProgressFlag() const { return HasFlag(TTxFlags::InProgress); }
void SetInProgressFlag(bool val = true) { SetFlag(TTxFlags::InProgress, val); }
@@ -752,6 +756,11 @@ public:
TString ExecutionProfileLogString(ui64 tabletId) const;
+ TMonotonic GetFinishProposeTs() const noexcept { return FinishProposeTs; }
+ void SetFinishProposeTs(TMonotonic now) noexcept { FinishProposeTs = now; }
+ void SetFinishProposeTs() noexcept;
+
+
protected:
TOperation()
: TOperation(TBasicOpInfo())
@@ -819,6 +828,8 @@ private:
size_t CurrentUnit;
TExecutionProfile ExecutionProfile;
+ TMonotonic FinishProposeTs;
+
static NMiniKQL::IEngineFlat::TValidationInfo EmptyKeysInfo;
public:
diff --git a/ydb/core/tx/datashard/store_data_tx_unit.cpp b/ydb/core/tx/datashard/store_data_tx_unit.cpp
index 8498e5c8b69..929002a4ca9 100644
--- a/ydb/core/tx/datashard/store_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/store_data_tx_unit.cpp
@@ -50,7 +50,10 @@ EExecutionStatus TStoreDataTxUnit::Execute(TOperation::TPtr op,
Pipeline.SaveForPropose(tx->GetDataTx());
Pipeline.ProposeTx(op, tx->GetTxBody(), txc, ctx);
- tx->ClearTxBody();
+ if (!op->HasVolatilePrepareFlag()) {
+ tx->ClearTxBody();
+ }
+
tx->ClearDataTx();
return EExecutionStatus::DelayCompleteNoMoreRestarts;
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..7c9ed5b640f
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt
@@ -0,0 +1,56 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_volatile)
+target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-datashard-ut_volatile
+ COMMAND
+ ydb-core-tx-datashard-ut_volatile
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-datashard-ut_volatile)
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..4ad68b372dc
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,58 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_volatile)
+target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-testing-unittest_main
+ core-tx-datashard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-datashard-ut_volatile
+ COMMAND
+ ydb-core-tx-datashard-ut_volatile
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-datashard-ut_volatile)
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt
new file mode 100644
index 00000000000..52605ae2bd2
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt
@@ -0,0 +1,60 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_volatile)
+target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-datashard-ut_volatile
+ COMMAND
+ ydb-core-tx-datashard-ut_volatile
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-datashard-ut_volatile)
diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt
new file mode 100644
index 00000000000..3e0811fb22e
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()