diff options
author | snaury <snaury@ydb.tech> | 2022-12-06 16:37:57 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-12-06 16:37:57 +0300 |
commit | f34f958708d7a10869e32bfbffb2cd4dd79cf1c1 (patch) | |
tree | 313c03535bd396e7d007d77880592a6a83a12c98 | |
parent | 0ca041dc2b282b64373ae3197952331ceaa430e5 (diff) | |
download | ydb-f34f958708d7a10869e32bfbffb2cd4dd79cf1c1.tar.gz |
Start introducing volatile distributed transactions
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 53 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.cpp | 61 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 79 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/store_data_tx_unit.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt | 56 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt | 58 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt | 60 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_volatile/CMakeLists.txt | 15 |
17 files changed, 449 insertions, 26 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 3101a8e04f0..5658b055b84 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -35,6 +35,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_testload) add_subdirectory(ut_upload_rows) +add_subdirectory(ut_volatile) add_library(core-tx-datashard) target_compile_options(core-tx-datashard PRIVATE diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 3ad5b986bb3..757ddf7068c 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1931,15 +1931,35 @@ void TDataShard::SendImmediateWriteResult( } } -void TDataShard::SendImmediateReadResult( - TMonotonic readTime, +TMonotonic TDataShard::ConfirmReadOnlyLease() { + if (IsFollower() || !ReadOnlyLeaseEnabled()) { + // Do nothing and return an empty timestamp + return {}; + } + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + Executor()->ConfirmReadOnlyLease(ts); + return ts; +} + +void TDataShard::ConfirmReadOnlyLease(TMonotonic ts) { + if (IsFollower() || !ReadOnlyLeaseEnabled()) { + // Do nothing + return; + } + + Executor()->ConfirmReadOnlyLease(ts); +} + +void TDataShard::SendWithConfirmedReadOnlyLease( + TMonotonic ts, const TActorId& target, IEventBase* event, ui64 cookie, const TActorId& sessionId) { if (IsFollower() || !ReadOnlyLeaseEnabled()) { - // We just send possibly stale result (old behavior) + // Send possibly stale result (legacy behavior) if (!sessionId) { Send(target, event, 0, cookie); } else { @@ -1962,23 +1982,42 @@ void TDataShard::SendImmediateReadResult( } }; - if (!readTime) { - readTime = AppData()->MonotonicTimeProvider->Now(); + if (!ts) { + ts = AppData()->MonotonicTimeProvider->Now(); } - Executor()->ConfirmReadOnlyLease(readTime, + Executor()->ConfirmReadOnlyLease(ts, [state = MakeIntrusive<TSendState>(sessionId, target, SelfId(), event, cookie)] { TActivationContext::Send(state->Ev.Release()); }); } +void TDataShard::SendWithConfirmedReadOnlyLease( + const TActorId& target, + IEventBase* event, + ui64 cookie, + const TActorId& sessionId) +{ + SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId); +} + +void TDataShard::SendImmediateReadResult( + TMonotonic readTime, + const TActorId& target, + IEventBase* event, + ui64 cookie, + const TActorId& sessionId) +{ + SendWithConfirmedReadOnlyLease(readTime, target, event, cookie, sessionId); +} + void TDataShard::SendImmediateReadResult( const TActorId& target, IEventBase* event, ui64 cookie, const TActorId& sessionId) { - SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie, sessionId); + SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId); } void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) { diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 1a3b91d68a2..a2237a97ca0 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -37,6 +37,11 @@ namespace NDataShard { ForceOnline = 0x04, Immediate = 0x08, + // Transaction must be prepared as a ditributed transaction, but + // must also be volatile, i.e. expect that other participants may + // cancel it even after it is planned. + VolatilePrepare = 0x10, + PublicFlagsMask = 0x000000000000FFFF, ////////////////////////////// diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 159d0b516f2..70f041a9787 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -348,7 +348,7 @@ TActiveTransaction::~TActiveTransaction() void TActiveTransaction::FillTxData(TValidatedDataTx::TPtr dataTx) { Y_VERIFY(!DataTx); - Y_VERIFY(TxBody.empty()); + Y_VERIFY(TxBody.empty() || HasVolatilePrepareFlag()); Target = dataTx->Source(); DataTx = dataTx; @@ -397,6 +397,30 @@ void TActiveTransaction::FillTxData(TDataShard *self, TrackMemory(); } +void TActiveTransaction::FillVolatileTxData(TDataShard *self, + TTransactionContext &txc, + const TActorContext &ctx) +{ + UntrackMemory(); + + Y_VERIFY(!DataTx); + Y_VERIFY(!TxBody.empty()); + + if (IsDataTx() || IsReadTable()) { + BuildDataTx(self, txc, ctx); + Y_VERIFY(DataTx->Ready()); + + if (DataTx->HasStreamResponse()) + SetStreamSink(DataTx->GetSink()); + } else if (IsSnapshotTx()) { + BuildSnapshotTx(); + } else { + Y_FAIL("Unexpected FillVolatileTxData call"); + } + + TrackMemory(); +} + TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self, TTransactionContext &txc, const TActorContext &ctx) diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index ce629bb4b27..8a1c715d75e 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -354,6 +354,9 @@ public: const TString &txBody, const TVector<TSysTables::TLocksTable::TLock> &locks, ui64 artifactFlags); + void FillVolatileTxData(TDataShard *self, + TTransactionContext &txc, + const TActorContext &ctx); const TString &GetTxBody() const { return TxBody; } void SetTxBody(const TString &txBody) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b20bce5ed4b..04bdf2a5afc 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1645,6 +1645,19 @@ public: ui64 GetMaxObservedStep() const; void SendImmediateWriteResult( const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0); + TMonotonic ConfirmReadOnlyLease(); + void ConfirmReadOnlyLease(TMonotonic ts); + void SendWithConfirmedReadOnlyLease( + TMonotonic ts, + const TActorId& target, + IEventBase* event, + ui64 cookie = 0, + const TActorId& sessionId = {}); + void SendWithConfirmedReadOnlyLease( + const TActorId& target, + IEventBase* event, + ui64 cookie = 0, + const TActorId& sessionId = {}); void SendImmediateReadResult( TMonotonic readTime, const TActorId& target, diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 13df5952495..6f95c80acad 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -531,6 +531,18 @@ bool TPipeline::LoadTxDetails(TTransactionContext &txc, LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadTxDetails at " << Self->TabletID() << " got data tx from cache " << tx->GetStep() << ":" << tx->GetTxId()); + } else if (tx->HasVolatilePrepareFlag()) { + // Since transaction is volatile it was never stored on disk, and it + // shouldn't have any artifacts yet. + tx->FillVolatileTxData(Self, txc, ctx); + + ui32 keysCount = 0; + keysCount = tx->ExtractKeys(); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "LoadTxDetails at " << Self->TabletID() << " loaded tx from memory " + << tx->GetStep() << ":" << tx->GetTxId() << " keys extracted: " + << keysCount); } else { NIceDb::TNiceDb db(txc.DB); TActorId target; @@ -993,7 +1005,9 @@ void TPipeline::ProposeTx(TOperation::TPtr op, const TStringBuf &txBody, TTransa { NIceDb::TNiceDb db(txc.DB); SetProposed(op->GetTxId(), op->GetTarget()); - PreserveSchema(db, op->GetMaxStep()); + if (!op->HasVolatilePrepareFlag()) { + PreserveSchema(db, op->GetMaxStep()); + } Self->TransQueue.ProposeTx(db, op, op->GetTarget(), txBody); if (Self->IsStopping() && op->GetTarget()) { // Send notification if we prepared a tx while shard was stopping diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp index d7096c25b32..b6e07f742ca 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.cpp +++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp @@ -225,6 +225,12 @@ void TTransQueue::ProposeTx(NIceDb::TNiceDb& db, TOperation::TPtr op, TActorId s const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask; AddTxInFly(op); + + if (op->HasVolatilePrepareFlag()) { + // We keep volatile transactions in memory and don't store anything + return; + } + db.Table<Schema::TxMain>().Key(op->GetTxId()).Update( NIceDb::TUpdate<Schema::TxMain::Kind>(op->GetKind()), NIceDb::TUpdate<Schema::TxMain::Flags>(op->GetFlags() & preserveFlagsMask), @@ -245,7 +251,10 @@ void TTransQueue::ProposeTx(NIceDb::TNiceDb& db, TOperation::TPtr op, TActorId s void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) { using Schema = TDataShard::Schema; - Y_VERIFY(TxsInFly.contains(txId)); + auto it = TxsInFly.find(txId); + Y_VERIFY(it != TxsInFly.end()); + + Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxFlags for a volatile transaction"); const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask; @@ -257,6 +266,11 @@ void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) { void TTransQueue::UpdateTxBody(NIceDb::TNiceDb& db, ui64 txId, const TStringBuf& txBody) { using Schema = TDataShard::Schema; + auto it = TxsInFly.find(txId); + Y_VERIFY(it != TxsInFly.end()); + + Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxBody for a volatile transaction"); + db.Table<Schema::TxDetails>().Key(txId, Self->TabletID()) .Update<Schema::TxDetails::Body>(TString(txBody)); } @@ -265,12 +279,15 @@ void TTransQueue::RemoveTx(NIceDb::TNiceDb &db, const TOperation &op) { using Schema = TDataShard::Schema; - db.Table<Schema::TxMain>().Key(op.GetTxId()).Delete(); - db.Table<Schema::TxDetails>().Key(op.GetTxId(), Self->TabletID()).Delete(); - db.Table<Schema::TxArtifacts>().Key(op.GetTxId()).Delete(); + if (!op.HasVolatilePrepareFlag()) { + db.Table<Schema::TxMain>().Key(op.GetTxId()).Delete(); + db.Table<Schema::TxDetails>().Key(op.GetTxId(), Self->TabletID()).Delete(); + db.Table<Schema::TxArtifacts>().Key(op.GetTxId()).Delete(); + + db.Table<Schema::PlanQueue>().Key(op.GetStep(), op.GetTxId()).Delete(); + db.Table<Schema::DeadlineQueue>().Key(op.GetMaxStep(), op.GetTxId()).Delete(); + } - db.Table<Schema::PlanQueue>().Key(op.GetStep(), op.GetTxId()).Delete(); - db.Table<Schema::DeadlineQueue>().Key(op.GetMaxStep(), op.GetTxId()).Delete(); DeadlineQueue.erase(std::make_pair(op.GetMaxStep(), op.GetTxId())); RemoveTxInFly(op.GetTxId()); PlannedTxs.erase(op.GetStepOrder()); @@ -321,6 +338,11 @@ bool TTransQueue::LoadTxDetails(NIceDb::TNiceDb &db, ui64 &artifactFlags) { using Schema = TDataShard::Schema; + auto it = TxsInFly.find(txId); + Y_VERIFY(it != TxsInFly.end()); + + Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected LoadTxDetails for a volatile transaction"); + auto detailsRow = db.Table<Schema::TxDetails>().Key(txId, Self->TabletID()).Select(); auto artifactsRow = db.Table<Schema::TxArtifacts>().Key(txId).Select(); @@ -370,11 +392,14 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) { ui64 maxStep = it->second->GetMaxStep(); - if (!ClearTxDetails(db, txId)) - return false; + if (!it->second->HasVolatilePrepareFlag()) { + if (!ClearTxDetails(db, txId)) { + return false; + } - db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete(); - db.Table<Schema::TxMain>().Key(txId).Delete(); + db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete(); + db.Table<Schema::TxMain>().Key(txId).Delete(); + } DeadlineQueue.erase(std::make_pair(maxStep, txId)); RemoveTxInFly(txId); @@ -401,11 +426,15 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Cleaning up tx " << txId << " with maxStep " << maxStep << " at outdatedStep " << outdatedStep); - if (!ClearTxDetails(db, txId)) - return ECleanupStatus::Restart; + auto it = TxsInFly.find(txId); + if (it != TxsInFly.end() && !it->second->HasVolatilePrepareFlag()) { + if (!ClearTxDetails(db, txId)) { + return ECleanupStatus::Restart; + } - db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete(); - db.Table<Schema::TxMain>().Key(txId).Delete(); + db.Table<Schema::DeadlineQueue>().Key(maxStep, txId).Delete(); + db.Table<Schema::TxMain>().Key(txId).Delete(); + } erasedDeadlines.insert(pr); outdatedTxs.push_back(txId); @@ -444,7 +473,9 @@ void TTransQueue::PlanTx(TOperation::TPtr op, } using Schema = TDataShard::Schema; - db.Table<Schema::PlanQueue>().Key(step, op->GetTxId()).Update(); + if (!op->HasVolatilePrepareFlag()) { + db.Table<Schema::PlanQueue>().Key(step, op->GetTxId()).Update(); + } PlannedTxs.emplace(op->GetStepOrder()); PlannedTxsByKind[op->GetKind()].emplace(op->GetStepOrder()); DeadlineQueue.erase(std::make_pair(op->GetMaxStep(), op->GetTxId())); diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp new file mode 100644 index 00000000000..0258c9c2e3b --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -0,0 +1,79 @@ +#include "datashard_ut_common.h" +#include "datashard_ut_common_kqp.h" +#include "datashard_active_transaction.h" + +namespace NKikimr { + +using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShard::NKqpHelpers; +using namespace NSchemeShard; +using namespace Tests; + +Y_UNIT_TEST_SUITE(DataShardVolatile) { + + Y_UNIT_TEST(DistributedWrite) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + auto forceVolatile = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + auto* msg = ev->Get<TEvDataShard::TEvProposeTransaction>(); + auto flags = msg->Record.GetFlags(); + if (!(flags & TTxFlags::Immediate)) { + Cerr << "... forcing propose to use volatile prepare" << Endl; + flags |= TTxFlags::VolatilePrepare; + msg->Record.SetFlags(flags); + } + break; + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(forceVolatile); + + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); + + Cerr << "!!! distributed write start" << Endl; + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )"); + Cerr << "!!! distributed write end" << Endl; + + runtime.SetObserverFunc(prevObserverFunc); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } }"); + } + +} // Y_UNIT_TEST_SUITE(DataShardVolatile) + +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index d4689228535..812b0796ddb 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -91,6 +91,10 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, } } + if (op->HasVolatilePrepareFlag() && !op->HasResultSentFlag() && !op->IsDirty()) { + op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease()); + } + if (!op->HasResultSentFlag() && (op->IsDirty() || !Pipeline.WaitCompletion(op))) CompleteRequest(op, ctx); @@ -181,6 +185,8 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, } else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) { // TODO: we should actually measure a read timestamp and use it here DataShard.SendImmediateReadResult(op->GetTarget(), res.Release(), op->GetCookie()); + } else if (op->HasVolatilePrepareFlag() && !op->IsDirty()) { + DataShard.SendWithConfirmedReadOnlyLease(op->GetFinishProposeTs(), op->GetTarget(), res.Release(), op->GetCookie()); } else { ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie()); } diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp index 37c14b9b982..d681acf292e 100644 --- a/ydb/core/tx/datashard/operation.cpp +++ b/ydb/core/tx/datashard/operation.cpp @@ -270,5 +270,10 @@ bool TOperation::HasRuntimeConflicts() const noexcept return !Dependencies.empty(); } +void TOperation::SetFinishProposeTs() noexcept +{ + SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now()); +} + } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 888cebdc5a1..fb182f810ca 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -196,7 +196,11 @@ public: void SetImmediateFlag(bool val = true) { SetFlag(TTxFlags::Immediate, val); } void ResetImmediateFlag() { ResetFlag(TTxFlags::Immediate); } - bool IsImmediate() const { return HasImmediateFlag() && !HasForceOnlineFlag(); } + bool HasVolatilePrepareFlag() const { return HasFlag(TTxFlags::VolatilePrepare); } + void SetVolatilePrepareFlag(bool val = true) { SetFlag(TTxFlags::VolatilePrepare, val); } + void ResetVolatilePrepareFlag() { ResetFlag(TTxFlags::VolatilePrepare); } + + bool IsImmediate() const { return HasImmediateFlag() && !HasForceOnlineFlag() && !HasVolatilePrepareFlag(); } bool HasInProgressFlag() const { return HasFlag(TTxFlags::InProgress); } void SetInProgressFlag(bool val = true) { SetFlag(TTxFlags::InProgress, val); } @@ -752,6 +756,11 @@ public: TString ExecutionProfileLogString(ui64 tabletId) const; + TMonotonic GetFinishProposeTs() const noexcept { return FinishProposeTs; } + void SetFinishProposeTs(TMonotonic now) noexcept { FinishProposeTs = now; } + void SetFinishProposeTs() noexcept; + + protected: TOperation() : TOperation(TBasicOpInfo()) @@ -819,6 +828,8 @@ private: size_t CurrentUnit; TExecutionProfile ExecutionProfile; + TMonotonic FinishProposeTs; + static NMiniKQL::IEngineFlat::TValidationInfo EmptyKeysInfo; public: diff --git a/ydb/core/tx/datashard/store_data_tx_unit.cpp b/ydb/core/tx/datashard/store_data_tx_unit.cpp index 8498e5c8b69..929002a4ca9 100644 --- a/ydb/core/tx/datashard/store_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/store_data_tx_unit.cpp @@ -50,7 +50,10 @@ EExecutionStatus TStoreDataTxUnit::Execute(TOperation::TPtr op, Pipeline.SaveForPropose(tx->GetDataTx()); Pipeline.ProposeTx(op, tx->GetTxBody(), txc, ctx); - tx->ClearTxBody(); + if (!op->HasVolatilePrepareFlag()) { + tx->ClearTxBody(); + } + tx->ClearDataTx(); return EExecutionStatus::DelayCompleteNoMoreRestarts; diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt new file mode 100644 index 00000000000..7c9ed5b640f --- /dev/null +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.darwin.txt @@ -0,0 +1,56 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_volatile) +target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp +) +add_test( + NAME + ydb-core-tx-datashard-ut_volatile + COMMAND + ydb-core-tx-datashard-ut_volatile + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-datashard-ut_volatile) diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..4ad68b372dc --- /dev/null +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-aarch64.txt @@ -0,0 +1,58 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_volatile) +target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + core-tx-datashard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp +) +add_test( + NAME + ydb-core-tx-datashard-ut_volatile + COMMAND + ydb-core-tx-datashard-ut_volatile + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-datashard-ut_volatile) diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt new file mode 100644 index 00000000000..52605ae2bd2 --- /dev/null +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.linux.txt @@ -0,0 +1,60 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_volatile) +target_compile_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_volatile PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_volatile PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-datashard-ut_volatile PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_volatile.cpp +) +add_test( + NAME + ydb-core-tx-datashard-ut_volatile + COMMAND + ydb-core-tx-datashard-ut_volatile + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-datashard-ut_volatile) diff --git a/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt b/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/ydb/core/tx/datashard/ut_volatile/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() |