aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-11-10 14:02:51 +0300
committersnaury <snaury@ydb.tech>2023-11-10 15:17:07 +0300
commit03e99db03f2adf212fea696fc1a13a9ff6135cfb (patch)
tree086c80d152ecbe69bfad766583bd20418ac93a10
parentc1c21c7a0fc92c4a95cfd387bac27b4d738eb0c2 (diff)
downloadydb-03e99db03f2adf212fea696fc1a13a9ff6135cfb.tar.gz
Support delayed acks for volatile transactions KIKIMR-19912
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp3
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp69
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h12
3 files changed, 72 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 3f9f66e6d93..0daa8090a6f 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -628,8 +628,7 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs,
if (Self->GetVolatileTxManager().FindByTxId(txId)) {
// This readset is for a known volatile transaction, we need to
// hand it off to volatile tx manager.
- Self->GetVolatileTxManager().ProcessReadSet(rs, txc);
- return true;
+ return Self->GetVolatileTxManager().ProcessReadSet(rs, std::move(ack), txc);
}
if (step <= OutdatedReadSetStep()) {
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 7c34ff3c3e0..470fa7ef44c 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -164,6 +164,11 @@ namespace NKikimr::NDataShard {
Y_ABORT_UNLESS(info && info->State == EVolatileTxState::Aborting);
Y_ABORT_UNLESS(info->AddCommitted);
+ for (auto& ev : info->DelayedAcks) {
+ TActivationContext::Send(ev.Release());
+ }
+ info->DelayedAcks.clear();
+
// Make a copy since it will disappear soon
auto commitTxIds = info->CommitTxIds;
@@ -572,6 +577,9 @@ namespace NKikimr::NDataShard {
NIceDb::TNiceDb db(txc.DB);
+ for (ui64 shardId : info->DelayedConfirmations) {
+ db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete();
+ }
for (ui64 shardId : info->Participants) {
db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete();
Self->RemoveExpectation(shardId, info->TxId);
@@ -702,8 +710,9 @@ namespace NKikimr::NDataShard {
RemoveFromCommitOrder(info);
}
- void TVolatileTxManager::ProcessReadSet(
+ bool TVolatileTxManager::ProcessReadSet(
const TEvTxProcessing::TEvReadSet& rs,
+ THolder<IEventHandle>&& ack,
TTransactionContext& txc)
{
using Schema = TDataShard::Schema;
@@ -714,9 +723,19 @@ namespace NKikimr::NDataShard {
auto* info = FindByTxId(txId);
Y_ABORT_UNLESS(info, "ProcessReadSet called for an unknown volatile tx");
- if (info->State != EVolatileTxState::Waiting) {
- // Transaction is already decided
- return;
+ switch (info->State) {
+ case EVolatileTxState::Waiting:
+ break;
+
+ case EVolatileTxState::Committed:
+ // We may ack normally, since committed state is persistent
+ return true;
+
+ case EVolatileTxState::Aborting:
+ // Aborting state will not change as long as we're still leader
+ return true;
+ // Ack readset normally as long as we're still a leader
+ return true;
}
ui64 srcTabletId = record.GetTabletSource();
@@ -725,12 +744,17 @@ namespace NKikimr::NDataShard {
if (dstTabletId != Self->TabletID()) {
LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Unexpected readset from " << srcTabletId << " to " << dstTabletId << " at tablet " << Self->TabletID());
- return;
+ return true;
}
if (!info->Participants.contains(srcTabletId)) {
// We are not waiting for readset from this participant
- return;
+ if (info->DelayedConfirmations.contains(srcTabletId)) {
+ // Confirmation is delayed, delay this new ack as well
+ info->DelayedAcks.push_back(std::move(ack));
+ return false;
+ }
+ return true;
}
bool committed = [&]() {
@@ -767,13 +791,15 @@ namespace NKikimr::NDataShard {
if (!committed) {
AbortWaitingTransaction(info);
- return;
+ return true;
}
+ NIceDb::TNiceDb db(txc.DB);
+
info->Participants.erase(srcTabletId);
+ info->DelayedAcks.push_back(std::move(ack));
+ info->DelayedConfirmations.insert(srcTabletId);
- NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::TxVolatileParticipants>().Key(txId, srcTabletId).Delete();
Self->RemoveExpectation(srcTabletId, txId);
if (info->Participants.empty()) {
@@ -783,6 +809,29 @@ namespace NKikimr::NDataShard {
info->State = EVolatileTxState::Committed;
db.Table<Schema::TxVolatileDetails>().Key(txId).Update(
NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State));
+
+ // Remove all delayed confirmations, since this tx is already writing
+ for (ui64 shardId : info->DelayedConfirmations) {
+ db.Table<Schema::TxVolatileParticipants>().Key(txId, shardId).Delete();
+ }
+ info->DelayedConfirmations.clear();
+
+ // Send delayed acks on commit
+ // TODO: maybe move it into a parameter?
+ struct TDelayedAcksState : public TThrRefBase {
+ TVector<THolder<IEventHandle>> DelayedAcks;
+
+ TDelayedAcksState(TVolatileTxInfo* info)
+ : DelayedAcks(std::move(info->DelayedAcks))
+ {}
+ };
+ txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() {
+ for (auto& ev : state->DelayedAcks) {
+ TActivationContext::Send(ev.Release());
+ }
+ });
+ info->DelayedAcks.clear();
+
// We may run callbacks immediately when effects are committed
if (info->AddCommitted) {
RunCommitCallbacks(info);
@@ -791,6 +840,8 @@ namespace NKikimr::NDataShard {
AddPendingCommit(txId);
}
}
+
+ return false;
}
void TVolatileTxManager::RunCommitCallbacks(TVolatileTxInfo* info) {
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index b9991a07ee9..ca7c05e2058 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -65,6 +65,9 @@ namespace NKikimr::NDataShard {
absl::flat_hash_set<ui64> WaitingRemovalOperations;
TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks;
+ TVector<THolder<IEventHandle>> DelayedAcks;
+ absl::flat_hash_set<ui64> DelayedConfirmations;
+
template<class TTag>
bool IsInList() const {
using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>;
@@ -217,8 +220,15 @@ namespace NKikimr::NDataShard {
void AbortWaitingTransaction(TVolatileTxInfo* info);
- void ProcessReadSet(
+ /**
+ * Process incoming readset for a known volatile transaction.
+ *
+ * Returns true when readset should be acknowledged (e.g. because it
+ * was persisted), false when ack is consumed.
+ */
+ bool ProcessReadSet(
const TEvTxProcessing::TEvReadSet& rs,
+ THolder<IEventHandle>&& ack,
TTransactionContext& txc);
void ProcessReadSetMissing(