diff options
author | snaury <snaury@ydb.tech> | 2023-08-14 11:10:12 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-08-14 12:06:43 +0300 |
commit | f6d091c5a6d57dbe4ac04d86237b21dc2cd40c72 (patch) | |
tree | 508fabcebabc60f19a1f1193daf786cd706a4cfe | |
parent | 0677981702178f1a002820ddcc517389022d197a (diff) | |
download | ydb-f6d091c5a6d57dbe4ac04d86237b21dc2cd40c72.tar.gz |
Use volatile planning for volatile transactions KIKIMR-18580
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__plan_step.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/mediator_queue.cpp | 31 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 2 |
4 files changed, 32 insertions, 11 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 383a2a34163..f0d11af1cda 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -901,6 +901,10 @@ private: transaction.SetMinStep(aggrMinStep); transaction.SetMaxStep(aggrMaxStep); + if (VolatileTx) { + transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile); + } + LOG_T("Execute planned transaction, coordinator: " << TxCoordinator); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); } diff --git a/ydb/core/tx/coordinator/coordinator__plan_step.cpp b/ydb/core/tx/coordinator/coordinator__plan_step.cpp index 7b3eb35daf1..28ac658346a 100644 --- a/ydb/core/tx/coordinator/coordinator__plan_step.cpp +++ b/ydb/core/tx/coordinator/coordinator__plan_step.cpp @@ -220,10 +220,8 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { Self->SendStepConfirmations(ProxyPlanConfirmations, ctx); bool needCommit = ( - // We want to persist steps that are aligned to configured resolution - (PlanOnStep % Self->Config.Resolution) == 0 || - // We want to extend lease when less than half is left for new steps - (Self->VolatileState.LastBlockedPending - PlanOnStep) < volatileLeaseMs/2); + // We want to extend lease when only a half is left for new steps + (Self->VolatileState.LastBlockedPending - PlanOnStep) <= volatileLeaseMs/2); if (!needCommit) { // Avoid making unnecessary commits diff --git a/ydb/core/tx/coordinator/mediator_queue.cpp b/ydb/core/tx/coordinator/mediator_queue.cpp index e077fdb6f64..b9c6da2c89d 100644 --- a/ydb/core/tx/coordinator/mediator_queue.cpp +++ b/ydb/core/tx/coordinator/mediator_queue.cpp @@ -10,6 +10,16 @@ namespace NKikimr { namespace NFlatTxCoordinator { +// Normally we flush on every confirmed step, since usually they are confirmed +// in the same order we planned them. However the first step in the queue may +// be blocked by some shard, so make sure we flush when at least 2 other steps +// have been confirmed. +static constexpr size_t ConfirmedStepsToFlush = 2; + +// Coordinator may need to persist confirmed participants, and we need to limit +// the number of rows as large transactions are problematic to commit. +static constexpr size_t ConfirmedParticipantsToFlush = 10'000; + void TMediatorStep::SerializeTo(TEvTxCoordinator::TEvCoordinatorStep *msg) const { for (const TTx &tx : Transactions) { NKikimrTx::TCoordinatorTransaction *x = msg->Record.AddTransactions(); @@ -37,6 +47,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi THashMap<std::pair<ui64, ui64>, TMediatorStepList::iterator> WaitingAcks; std::unique_ptr<TMediatorConfirmations> Confirmations; + size_t ConfirmedParticipants = 0; + size_t ConfirmedSteps = 0; void Die(const TActorContext &ctx) override { if (PipeClient) { @@ -81,6 +93,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi << " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString() << " Owner"); ctx.Send(Owner, new TEvMediatorQueueConfirmations(std::move(Confirmations))); + ConfirmedParticipants = 0; + ConfirmedSteps = 0; } } @@ -177,8 +191,6 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi SendStep(*it, ctx); } } - - SendConfirmations(ctx); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) { @@ -214,7 +226,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() << " HANDLE EvPlanStepAck"); - bool fullyConfirmed = false; + bool firstConfirmed = false; const TTabletId tabletId = record.GetTabletId(); for (const auto txid : record.GetTxId()) { @@ -224,10 +236,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi if (!Confirmations) Confirmations.reset(new TMediatorConfirmations(Mediator)); - Confirmations->Acks[txid].insert(tabletId); + if (Confirmations->Acks[txid].insert(tabletId).second) + ++ConfirmedParticipants; if (0 == --it->second->References) { + ++ConfirmedSteps; Y_VERIFY(!Queue.empty()); + if (it->second == Queue.begin()) { + firstConfirmed = true; + } auto last = --Queue.end(); if (it->second != last) { Queue.erase(it->second); @@ -236,13 +253,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi it->second->Transactions.clear(); it->second->Transactions.shrink_to_fit(); } - fullyConfirmed = true; } WaitingAcks.erase(it); } } - if (fullyConfirmed) { + if (firstConfirmed || + ConfirmedSteps >= ConfirmedStepsToFlush || + ConfirmedParticipants >= ConfirmedParticipantsToFlush) + { SendConfirmations(ctx); } } diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index bec47176646..366828b3dda 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -100,7 +100,7 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease()); } - if (!op->HasResultSentFlag() && (op->IsDirty() || !Pipeline.WaitCompletion(op))) + if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) CompleteRequest(op, ctx); if (!DataShard.IsFollower()) |