aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-08-14 11:10:12 +0300
committersnaury <snaury@ydb.tech>2023-08-14 12:06:43 +0300
commitf6d091c5a6d57dbe4ac04d86237b21dc2cd40c72 (patch)
tree508fabcebabc60f19a1f1193daf786cd706a4cfe
parent0677981702178f1a002820ddcc517389022d197a (diff)
downloadydb-f6d091c5a6d57dbe4ac04d86237b21dc2cd40c72.tar.gz
Use volatile planning for volatile transactions KIKIMR-18580
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp4
-rw-r--r--ydb/core/tx/coordinator/coordinator__plan_step.cpp6
-rw-r--r--ydb/core/tx/coordinator/mediator_queue.cpp31
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp2
4 files changed, 32 insertions, 11 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 383a2a34163..f0d11af1cda 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -901,6 +901,10 @@ private:
transaction.SetMinStep(aggrMinStep);
transaction.SetMaxStep(aggrMaxStep);
+ if (VolatileTx) {
+ transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
+ }
+
LOG_T("Execute planned transaction, coordinator: " << TxCoordinator);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
}
diff --git a/ydb/core/tx/coordinator/coordinator__plan_step.cpp b/ydb/core/tx/coordinator/coordinator__plan_step.cpp
index 7b3eb35daf1..28ac658346a 100644
--- a/ydb/core/tx/coordinator/coordinator__plan_step.cpp
+++ b/ydb/core/tx/coordinator/coordinator__plan_step.cpp
@@ -220,10 +220,8 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
Self->SendStepConfirmations(ProxyPlanConfirmations, ctx);
bool needCommit = (
- // We want to persist steps that are aligned to configured resolution
- (PlanOnStep % Self->Config.Resolution) == 0 ||
- // We want to extend lease when less than half is left for new steps
- (Self->VolatileState.LastBlockedPending - PlanOnStep) < volatileLeaseMs/2);
+ // We want to extend lease when only a half is left for new steps
+ (Self->VolatileState.LastBlockedPending - PlanOnStep) <= volatileLeaseMs/2);
if (!needCommit) {
// Avoid making unnecessary commits
diff --git a/ydb/core/tx/coordinator/mediator_queue.cpp b/ydb/core/tx/coordinator/mediator_queue.cpp
index e077fdb6f64..b9c6da2c89d 100644
--- a/ydb/core/tx/coordinator/mediator_queue.cpp
+++ b/ydb/core/tx/coordinator/mediator_queue.cpp
@@ -10,6 +10,16 @@
namespace NKikimr {
namespace NFlatTxCoordinator {
+// Normally we flush on every confirmed step, since usually they are confirmed
+// in the same order we planned them. However the first step in the queue may
+// be blocked by some shard, so make sure we flush when at least 2 other steps
+// have been confirmed.
+static constexpr size_t ConfirmedStepsToFlush = 2;
+
+// Coordinator may need to persist confirmed participants, and we need to limit
+// the number of rows as large transactions are problematic to commit.
+static constexpr size_t ConfirmedParticipantsToFlush = 10'000;
+
void TMediatorStep::SerializeTo(TEvTxCoordinator::TEvCoordinatorStep *msg) const {
for (const TTx &tx : Transactions) {
NKikimrTx::TCoordinatorTransaction *x = msg->Record.AddTransactions();
@@ -37,6 +47,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
THashMap<std::pair<ui64, ui64>, TMediatorStepList::iterator> WaitingAcks;
std::unique_ptr<TMediatorConfirmations> Confirmations;
+ size_t ConfirmedParticipants = 0;
+ size_t ConfirmedSteps = 0;
void Die(const TActorContext &ctx) override {
if (PipeClient) {
@@ -81,6 +93,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
<< " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString()
<< " Owner");
ctx.Send(Owner, new TEvMediatorQueueConfirmations(std::move(Confirmations)));
+ ConfirmedParticipants = 0;
+ ConfirmedSteps = 0;
}
}
@@ -177,8 +191,6 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
SendStep(*it, ctx);
}
}
-
- SendConfirmations(ctx);
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
@@ -214,7 +226,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
<< " HANDLE EvPlanStepAck");
- bool fullyConfirmed = false;
+ bool firstConfirmed = false;
const TTabletId tabletId = record.GetTabletId();
for (const auto txid : record.GetTxId()) {
@@ -224,10 +236,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
if (!Confirmations)
Confirmations.reset(new TMediatorConfirmations(Mediator));
- Confirmations->Acks[txid].insert(tabletId);
+ if (Confirmations->Acks[txid].insert(tabletId).second)
+ ++ConfirmedParticipants;
if (0 == --it->second->References) {
+ ++ConfirmedSteps;
Y_VERIFY(!Queue.empty());
+ if (it->second == Queue.begin()) {
+ firstConfirmed = true;
+ }
auto last = --Queue.end();
if (it->second != last) {
Queue.erase(it->second);
@@ -236,13 +253,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
it->second->Transactions.clear();
it->second->Transactions.shrink_to_fit();
}
- fullyConfirmed = true;
}
WaitingAcks.erase(it);
}
}
- if (fullyConfirmed) {
+ if (firstConfirmed ||
+ ConfirmedSteps >= ConfirmedStepsToFlush ||
+ ConfirmedParticipants >= ConfirmedParticipantsToFlush)
+ {
SendConfirmations(ctx);
}
}
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index bec47176646..366828b3dda 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -100,7 +100,7 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
}
- if (!op->HasResultSentFlag() && (op->IsDirty() || !Pipeline.WaitCompletion(op)))
+ if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op)))
CompleteRequest(op, ctx);
if (!DataShard.IsFollower())