diff options
author | snaury <snaury@ydb.tech> | 2023-11-10 14:02:51 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-11-10 15:17:07 +0300 |
commit | 03e99db03f2adf212fea696fc1a13a9ff6135cfb (patch) | |
tree | 086c80d152ecbe69bfad766583bd20418ac93a10 | |
parent | c1c21c7a0fc92c4a95cfd387bac27b4d738eb0c2 (diff) | |
download | ydb-03e99db03f2adf212fea696fc1a13a9ff6135cfb.tar.gz |
Support delayed acks for volatile transactions KIKIMR-19912
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 69 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 12 |
3 files changed, 72 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 3f9f66e6d93..0daa8090a6f 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -628,8 +628,7 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs, if (Self->GetVolatileTxManager().FindByTxId(txId)) { // This readset is for a known volatile transaction, we need to // hand it off to volatile tx manager. - Self->GetVolatileTxManager().ProcessReadSet(rs, txc); - return true; + return Self->GetVolatileTxManager().ProcessReadSet(rs, std::move(ack), txc); } if (step <= OutdatedReadSetStep()) { diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 7c34ff3c3e0..470fa7ef44c 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -164,6 +164,11 @@ namespace NKikimr::NDataShard { Y_ABORT_UNLESS(info && info->State == EVolatileTxState::Aborting); Y_ABORT_UNLESS(info->AddCommitted); + for (auto& ev : info->DelayedAcks) { + TActivationContext::Send(ev.Release()); + } + info->DelayedAcks.clear(); + // Make a copy since it will disappear soon auto commitTxIds = info->CommitTxIds; @@ -572,6 +577,9 @@ namespace NKikimr::NDataShard { NIceDb::TNiceDb db(txc.DB); + for (ui64 shardId : info->DelayedConfirmations) { + db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete(); + } for (ui64 shardId : info->Participants) { db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete(); Self->RemoveExpectation(shardId, info->TxId); @@ -702,8 +710,9 @@ namespace NKikimr::NDataShard { RemoveFromCommitOrder(info); } - void TVolatileTxManager::ProcessReadSet( + bool TVolatileTxManager::ProcessReadSet( const TEvTxProcessing::TEvReadSet& rs, + THolder<IEventHandle>&& ack, TTransactionContext& txc) { using Schema = TDataShard::Schema; @@ -714,9 +723,19 @@ namespace NKikimr::NDataShard { auto* info = FindByTxId(txId); Y_ABORT_UNLESS(info, "ProcessReadSet called for an unknown volatile tx"); - if (info->State != EVolatileTxState::Waiting) { - // Transaction is already decided - return; + switch (info->State) { + case EVolatileTxState::Waiting: + break; + + case EVolatileTxState::Committed: + // We may ack normally, since committed state is persistent + return true; + + case EVolatileTxState::Aborting: + // Aborting state will not change as long as we're still leader + return true; + // Ack readset normally as long as we're still a leader + return true; } ui64 srcTabletId = record.GetTabletSource(); @@ -725,12 +744,17 @@ namespace NKikimr::NDataShard { if (dstTabletId != Self->TabletID()) { LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Unexpected readset from " << srcTabletId << " to " << dstTabletId << " at tablet " << Self->TabletID()); - return; + return true; } if (!info->Participants.contains(srcTabletId)) { // We are not waiting for readset from this participant - return; + if (info->DelayedConfirmations.contains(srcTabletId)) { + // Confirmation is delayed, delay this new ack as well + info->DelayedAcks.push_back(std::move(ack)); + return false; + } + return true; } bool committed = [&]() { @@ -767,13 +791,15 @@ namespace NKikimr::NDataShard { if (!committed) { AbortWaitingTransaction(info); - return; + return true; } + NIceDb::TNiceDb db(txc.DB); + info->Participants.erase(srcTabletId); + info->DelayedAcks.push_back(std::move(ack)); + info->DelayedConfirmations.insert(srcTabletId); - NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::TxVolatileParticipants>().Key(txId, srcTabletId).Delete(); Self->RemoveExpectation(srcTabletId, txId); if (info->Participants.empty()) { @@ -783,6 +809,29 @@ namespace NKikimr::NDataShard { info->State = EVolatileTxState::Committed; db.Table<Schema::TxVolatileDetails>().Key(txId).Update( NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State)); + + // Remove all delayed confirmations, since this tx is already writing + for (ui64 shardId : info->DelayedConfirmations) { + db.Table<Schema::TxVolatileParticipants>().Key(txId, shardId).Delete(); + } + info->DelayedConfirmations.clear(); + + // Send delayed acks on commit + // TODO: maybe move it into a parameter? + struct TDelayedAcksState : public TThrRefBase { + TVector<THolder<IEventHandle>> DelayedAcks; + + TDelayedAcksState(TVolatileTxInfo* info) + : DelayedAcks(std::move(info->DelayedAcks)) + {} + }; + txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() { + for (auto& ev : state->DelayedAcks) { + TActivationContext::Send(ev.Release()); + } + }); + info->DelayedAcks.clear(); + // We may run callbacks immediately when effects are committed if (info->AddCommitted) { RunCommitCallbacks(info); @@ -791,6 +840,8 @@ namespace NKikimr::NDataShard { AddPendingCommit(txId); } } + + return false; } void TVolatileTxManager::RunCommitCallbacks(TVolatileTxInfo* info) { diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index b9991a07ee9..ca7c05e2058 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -65,6 +65,9 @@ namespace NKikimr::NDataShard { absl::flat_hash_set<ui64> WaitingRemovalOperations; TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks; + TVector<THolder<IEventHandle>> DelayedAcks; + absl::flat_hash_set<ui64> DelayedConfirmations; + template<class TTag> bool IsInList() const { using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>; @@ -217,8 +220,15 @@ namespace NKikimr::NDataShard { void AbortWaitingTransaction(TVolatileTxInfo* info); - void ProcessReadSet( + /** + * Process incoming readset for a known volatile transaction. + * + * Returns true when readset should be acknowledged (e.g. because it + * was persisted), false when ack is consumed. + */ + bool ProcessReadSet( const TEvTxProcessing::TEvReadSet& rs, + THolder<IEventHandle>&& ack, TTransactionContext& txc); void ProcessReadSetMissing( |