diff options
author | snaury <snaury@ydb.tech> | 2023-07-25 20:04:06 +0300 |
---|---|---|
committer | root <root@qavm-2ed34686.qemu> | 2023-07-25 20:04:06 +0300 |
commit | d49935cc4b833d7adefe349d251a8705fbc8e6fe (patch) | |
tree | cb5686f3908e75f308614e6abdd5d1c0a0680af2 | |
parent | dd49b96fb21fbea107931e3ca4f1d9b292241969 (diff) | |
download | ydb-d49935cc4b833d7adefe349d251a8705fbc8e6fe.tar.gz |
Use in-memory mediator queue with local resends KIKIMR-18580
-rw-r--r-- | ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator.h | 43 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__plan_step.cpp | 107 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__restart_mediator.cpp | 111 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator__restore_transaction.cpp | 79 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator_impl.cpp | 177 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator_impl.h | 120 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/coordinator_ut.cpp | 59 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/mediator_queue.cpp | 161 | ||||
-rw-r--r-- | ydb/core/tx/coordinator/ya.make | 2 |
15 files changed, 489 insertions, 411 deletions
diff --git a/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt index 6b5e5d1b2b..8b3bb6b698 100644 --- a/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(core-tx-coordinator PUBLIC cpp-actors-core cpp-actors-helpers cpp-actors-interconnect + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-engine-minikql @@ -39,7 +40,6 @@ target_sources(core-tx-coordinator PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp diff --git a/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt b/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt index 6e0e3565fd..5dd15ca26c 100644 --- a/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-tx-coordinator PUBLIC cpp-actors-core cpp-actors-helpers cpp-actors-interconnect + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-engine-minikql @@ -40,7 +41,6 @@ target_sources(core-tx-coordinator PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp diff --git a/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt index 6e0e3565fd..5dd15ca26c 100644 --- a/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-tx-coordinator PUBLIC cpp-actors-core cpp-actors-helpers cpp-actors-interconnect + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-engine-minikql @@ -40,7 +41,6 @@ target_sources(core-tx-coordinator PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp diff --git a/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt index 6b5e5d1b2b..8b3bb6b698 100644 --- a/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(core-tx-coordinator PUBLIC cpp-actors-core cpp-actors-helpers cpp-actors-interconnect + cpp-containers-absl_flat_hash ydb-core-actorlib_impl ydb-core-base core-engine-minikql @@ -39,7 +40,6 @@ target_sources(core-tx-coordinator PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp diff --git a/ydb/core/tx/coordinator/coordinator.cpp b/ydb/core/tx/coordinator/coordinator.cpp index 3e76fe192e..bdcabefbb3 100644 --- a/ydb/core/tx/coordinator/coordinator.cpp +++ b/ydb/core/tx/coordinator/coordinator.cpp @@ -18,10 +18,6 @@ NKikimr::TEvTxCoordinator::TEvCoordinatorStep::TEvCoordinatorStep(const NFlatTxC } } -NKikimr::TEvTxCoordinator::TEvMediatorQueueConfirmations::TEvMediatorQueueConfirmations(TAutoPtr<NKikimr::NFlatTxCoordinator::TMediatorConfirmations> &confirmations) -: Confirmations(confirmations) -{} - NKikimr::TEvTxCoordinator::TEvCoordinatorStepResult::TEvCoordinatorStepResult(NKikimrTx::TEvCoordinatorStepResult::EStatus status, ui64 step, ui64 completeStep, ui64 latestKnown, ui64 subjectiveTime, ui64 mediator, ui64 coordinator) { Record.SetStatus(status); @@ -55,22 +51,3 @@ NKikimr::TEvTxCoordinator::TEvCoordinatorSyncResult::TEvCoordinatorSyncResult(ui Record.SetMediatorID(mediator); Record.SetCoordinatorID(coordinator); } - -NKikimr::TEvTxCoordinator::TEvMediatorQueueStep::TEvMediatorQueueStep(ui64 genCookie, TAutoPtr<NKikimr::NFlatTxCoordinator::TMediatorStep> step) -: GenCookie(genCookie) -, Step(step) -{} - -NKikimr::TEvTxCoordinator::TEvMediatorQueueRestart::TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie) -: MediatorId(mediatorId) -, StartFrom(startFrom) -, GenCookie(genCookie) -{} - -NKikimr::TEvTxCoordinator::TEvMediatorQueueStop::TEvMediatorQueueStop(ui64 mediatorId) -: MediatorId(mediatorId) -{} - -NKikimr::TEvTxCoordinator::TEvCoordinatorConfirmPlan::TEvCoordinatorConfirmPlan(TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> &confirmations) -: Confirmations(confirmations) -{} diff --git a/ydb/core/tx/coordinator/coordinator.h b/ydb/core/tx/coordinator/coordinator.h index 771fed39ca..a7b03f5cac 100644 --- a/ydb/core/tx/coordinator/coordinator.h +++ b/ydb/core/tx/coordinator/coordinator.h @@ -1,7 +1,6 @@ #pragma once #include "defs.h" #include <ydb/core/tx/tx.h> -#include <ydb/core/util/queue_oneone_inplace.h> #include <util/generic/bitmap.h> #include <util/generic/set.h> #include <util/generic/hash.h> @@ -9,15 +8,11 @@ #include <util/generic/hash_set.h> namespace NKikimr { + namespace NFlatTxCoordinator { struct TMediatorStep; - struct TMediatorConfirmations; - struct TCoordinatorStepConfirmations; -} } -namespace NKikimr { - IActor* CreateFlatTxCoordinator(const TActorId &tablet, TTabletStorageInfo *info); struct TEvTxCoordinator { @@ -33,7 +28,7 @@ struct TEvTxCoordinator { EvMediatorQueueStop, EvMediatorQueueConfirmations, - EvCoordinatorConfirmPlan = EvCoordinatorStep + 3 * 512, + // deprecated: EvCoordinatorConfirmPlan = EvCoordinatorStep + 3 * 512, EvEnd }; @@ -73,40 +68,6 @@ struct TEvTxCoordinator { }; - // must be explicit queue? - struct TEvMediatorQueueStep : public TEventLocal<TEvMediatorQueueStep, EvMediatorQueueStep> { - const ui64 GenCookie; - TAutoPtr<NFlatTxCoordinator::TMediatorStep> Step; - - TEvMediatorQueueStep(ui64 genCookie, TAutoPtr<NFlatTxCoordinator::TMediatorStep> step); - }; - - struct TEvMediatorQueueRestart : public TEventLocal<TEvMediatorQueueRestart, EvMediatorQueueRestart> { - const ui64 MediatorId; - const ui64 StartFrom; - const ui64 GenCookie; - - TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie); - }; - - struct TEvMediatorQueueStop : public TEventLocal<TEvMediatorQueueStop, EvMediatorQueueStop> { - const ui64 MediatorId; - - TEvMediatorQueueStop(ui64 mediatorId); - }; - - struct TEvMediatorQueueConfirmations : public TEventLocal<TEvMediatorQueueConfirmations, EvMediatorQueueConfirmations> { - TAutoPtr<NFlatTxCoordinator::TMediatorConfirmations> Confirmations; - - TEvMediatorQueueConfirmations(TAutoPtr<NFlatTxCoordinator::TMediatorConfirmations> &confirmations); - }; - - struct TEvCoordinatorConfirmPlan : public TEventLocal<TEvCoordinatorConfirmPlan, EvCoordinatorConfirmPlan> { - TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> Confirmations; - - TEvCoordinatorConfirmPlan(TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> &confirmations); - }; - }; } diff --git a/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp b/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp index 28a644503d..3f15ce76a7 100644 --- a/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp +++ b/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp @@ -8,12 +8,12 @@ namespace NKikimr { namespace NFlatTxCoordinator { struct TTxCoordinator::TTxMediatorConfirmations : public TTransactionBase<TTxCoordinator> { - TAutoPtr<TMediatorConfirmations> Confirmations; + std::unique_ptr<TMediatorConfirmations> Confirmations; i64 CompleteTransactions; - TTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations, TSelf *coordinator) + TTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations, TSelf *coordinator) : TBase(coordinator) - , Confirmations(confirmations) + , Confirmations(std::move(confirmations)) , CompleteTransactions(0) {} @@ -82,8 +82,8 @@ struct TTxCoordinator::TTxMediatorConfirmations : public TTransactionBase<TTxCoo } }; -ITransaction* TTxCoordinator::CreateTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations) { - return new TTxMediatorConfirmations(confirmations, this); +ITransaction* TTxCoordinator::CreateTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations) { + return new TTxMediatorConfirmations(std::move(confirmations), this); } } diff --git a/ydb/core/tx/coordinator/coordinator__plan_step.cpp b/ydb/core/tx/coordinator/coordinator__plan_step.cpp index b328d015c4..82db717845 100644 --- a/ydb/core/tx/coordinator/coordinator__plan_step.cpp +++ b/ydb/core/tx/coordinator/coordinator__plan_step.cpp @@ -19,24 +19,24 @@ struct TInFlyAccountant { struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { const ui64 PlanOnStep; - TVector<TQueueType::TSlot> Slots; + std::deque<TQueueType::TSlot> Slots; - TMap<ui64, std::pair<ui64, bool *>> StepsToConfirm; - TAutoPtr<TCoordinatorStepConfirmations> ProxyPlanConfirmations; + TMap<ui64, TMediatorStepList::iterator> StepsToConfirm; + TCoordinatorStepConfirmations ProxyPlanConfirmations; TInstant ExecStartMoment; ui64 PlannedCounter; ui64 DeclinedCounter; TInFlyAccountant InFlyAccountant; - TTxPlanStep(ui64 toPlan, TVector<TQueueType::TSlot> &slots, TSelf *coordinator) + TTxPlanStep(ui64 toPlan, std::deque<TQueueType::TSlot> &&slots, TSelf *coordinator) : TBase(coordinator) , PlanOnStep(toPlan) + , Slots(std::move(slots)) , PlannedCounter(0) , DeclinedCounter(0) , InFlyAccountant(Self->MonCounters.StepsInFly) { - Slots.swap(slots); } void Plan(TTransactionContext &txc, const TActorContext &ctx) { @@ -46,40 +46,37 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { const bool lowDiskSpace = Self->Executor()->GetStats().IsAnyChannelYellowStop; THashSet<TTxId> newTransactions; - TVector<TAutoPtr<TMediatorStep>> mediatorSteps; + TVector<TMediatorStep> mediatorSteps; THashMap<TTabletId, TVector<TTabletId>> byMediatorAffected; // first fill every mediator with something (every mediator must receive step) const ui32 mediatorsSize = Self->Config.Mediators->List().size(); mediatorSteps.reserve(mediatorsSize); for (TTabletId mediatorId : Self->Config.Mediators->List()) { - mediatorSteps.push_back(new TMediatorStep(mediatorId, PlanOnStep)); + mediatorSteps.emplace_back(mediatorId, PlanOnStep); } // create mediator steps - ProxyPlanConfirmations.Reset(new TCoordinatorStepConfirmations(PlanOnStep)); - for (const auto &slot : Slots) { - TQueueType::TQ &queue = *slot.Queue; - TQueueType::TQ::TReadIterator iterator = queue.Iterator(); - while (TTransactionProposal *proposal = iterator.Next()) { + for (auto &slot : Slots) { + for (auto &proposal : slot) { for (auto &x : byMediatorAffected) { x.second.clear(); } - const TTxId txId = proposal->TxId; + const TTxId txId = proposal.TxId; Y_VERIFY(txId); Self->MonCounters.StepConsideredTx->Inc(); - auto durationMs = (ExecStartMoment - proposal->AcceptMoment).MilliSeconds(); + auto durationMs = (ExecStartMoment - proposal.AcceptMoment).MilliSeconds(); Self->MonCounters.TxFromReceiveToPlan->Collect(durationMs); - if (proposal->MaxStep < PlanOnStep) { + if (proposal.MaxStep < PlanOnStep) { Self->MonCounters.StepOutdatedTx->Inc(); - ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry { + ProxyPlanConfirmations.Queue.emplace_back( txId, - proposal->Proxy, + proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, - 0 }); + 0); ++DeclinedCounter; continue; } @@ -87,11 +84,11 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { // check is transactions already processed? if (newTransactions.insert(txId).second == false) { Self->MonCounters.StepPlannedDeclinedTx->Inc(); - ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry { + ProxyPlanConfirmations.Queue.emplace_back( txId, - proposal->Proxy, + proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned, - PlanOnStep }); + PlanOnStep); ++DeclinedCounter; continue; } @@ -100,24 +97,23 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { auto it = Self->Transactions.find(txId); if (it != Self->Transactions.end()) { Self->MonCounters.StepPlannedDeclinedTx->Inc(); - ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry { + ProxyPlanConfirmations.Queue.emplace_back( txId, - proposal->Proxy, + proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned, - it->second.PlanOnStep }); + it->second.PlanOnStep); ++DeclinedCounter; continue; } } - if (lowDiskSpace && !proposal->IgnoreLowDiskSpace) { + if (lowDiskSpace && !proposal.IgnoreLowDiskSpace) { Self->MonCounters.StepDeclinedNoSpaceTx->Inc(); - ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry{ + ProxyPlanConfirmations.Queue.emplace_back( txId, - proposal->Proxy, + proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclinedNoSpace, - 0 - }); + 0); ++DeclinedCounter; continue; } @@ -128,8 +124,8 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { TTransaction& transaction = Self->Transactions[txId]; transaction.PlanOnStep = PlanOnStep; - Y_VERIFY(!proposal->AffectedSet.empty()); - for (const auto &txprop : proposal->AffectedSet) { + Y_VERIFY(!proposal.AffectedSet.empty()); + for (const auto &txprop : proposal.AffectedSet) { const TTabletId affectedTablet = txprop.TabletId; const TTabletId mediatorId = Self->Config.Mediators->Select(affectedTablet); @@ -149,10 +145,10 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { } for (ui32 idx = 0; idx < mediatorsSize; ++idx) { - TTabletId mediatorId = mediatorSteps[idx]->MediatorId; + TTabletId mediatorId = mediatorSteps[idx].MediatorId; TVector<TTabletId> &affected = byMediatorAffected[mediatorId]; if (!affected.empty()) { - mediatorSteps[idx]->Transactions.push_back(TMediatorStep::TTx(txId, &affected.front(), affected.size(), 0)); + mediatorSteps[idx].Transactions.emplace_back(txId, affected.data(), affected.size(), 0); } } @@ -160,19 +156,19 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { ++PlannedCounter; Self->MonCounters.StepPlannedTx->Inc(); - ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry { + ProxyPlanConfirmations.Queue.emplace_back( txId, - proposal->Proxy, + proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned, - PlanOnStep } ); + PlanOnStep); } } - for (const TAutoPtr<TMediatorStep> &mp : mediatorSteps) { - const ui64 mediatorId = mp->MediatorId; + for (TMediatorStep& m : mediatorSteps) { + const ui64 mediatorId = m.MediatorId; // write mediator entry - for (const auto &tx : mp->Transactions) { + for (const auto &tx : m.Transactions) { for (TTabletId tablet : tx.PushToAffected) { db.Table<Schema::AffectedSet>().Key(mediatorId, tx.TxId, tablet).Update(); FLOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "Planned transaction " << tx.TxId << " for mediator " << mediatorId << " tablet " << tablet); @@ -180,12 +176,12 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { } TMediator& mediator = Self->Mediator(mediatorId, ctx); - if (mediator.PushUpdates) { - StepsToConfirm[mediatorId] = std::pair<ui64, bool *>(mediator.GenCookie, &mp->Confirmed); - mediator.Queue->Push(mp.Release()); - } else if (!StepsToConfirm.empty()) { - FLOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "PushUpdates false for mediator " << mediatorId << " step " << PlanOnStep); + if (!mediator.Queue.empty() && mediator.Queue.back().Confirmed && mediator.Queue.back().Transactions.empty()) { + // Remove the last confirmed empty step } + mediator.Queue.emplace_back(std::move(m)); + auto it = --mediator.Queue.end(); + StepsToConfirm[mediatorId] = it; } db.Table<Schema::State>().Key(Schema::State::KeyLastPlanned).Update(NIceDb::TUpdate<Schema::State::StateValue>(PlanOnStep)); } @@ -210,24 +206,31 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> { auto durationMs = (ctx.Now() - ExecStartMoment).MilliSeconds(); Self->MonCounters.TxPlanLatency->Collect(durationMs); - for (auto &cx : StepsToConfirm) { - const ui64 mediatorId = cx.first; + for (auto &pr : StepsToConfirm) { + const ui64 mediatorId = pr.first; TMediator &mediator = Self->Mediator(mediatorId, ctx); - if (mediator.GenCookie == cx.second.first) { - *cx.second.second = true; - Self->SendMediatorStep(mediator, ctx); + Y_VERIFY(!mediator.Queue.empty()); + pr.second->Confirmed = true; + for (auto it = pr.second; it != mediator.Queue.begin();) { + --it; + if (!it->Confirmed) break; + if (!it->Transactions.empty()) break; + // Remove empty confirmed steps before us + // Needed so the queue does not grow for disconnected mediators + mediator.Queue.erase(it++); } + Self->SendMediatorStep(mediator, ctx); } - ctx.Send(ctx.SelfID, new TEvTxCoordinator::TEvCoordinatorConfirmPlan(ProxyPlanConfirmations)); + Self->SendStepConfirmations(ProxyPlanConfirmations, ctx); // uncomment this to enable consistency self-check //Self->Execute(Self->CreateTxConsistencyCheck(), ctx); } }; -ITransaction* TTxCoordinator::CreateTxPlanStep(ui64 toStep, TVector<TQueueType::TSlot> &slots) { - return new TTxPlanStep(toStep, slots, this); +ITransaction* TTxCoordinator::CreateTxPlanStep(ui64 toStep, std::deque<TQueueType::TSlot> &&slots) { + return new TTxPlanStep(toStep, std::move(slots), this); } } diff --git a/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp b/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp deleted file mode 100644 index 673115d22b..0000000000 --- a/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp +++ /dev/null @@ -1,111 +0,0 @@ -#include "coordinator_impl.h" - -namespace NKikimr { -namespace NFlatTxCoordinator { - -struct TTxCoordinator::TTxRestartMediatorQueue : public TTransactionBase<TTxCoordinator> { - const TTabletId MediatorId; - const ui64 GenCookie; - - TVector<bool *> StepsToConfirm; - - TTxRestartMediatorQueue(ui64 mediatorId, ui64 genCookie, TSelf *coordinator) - : TBase(coordinator) - , MediatorId(mediatorId) - , GenCookie(genCookie) - {} - - TTxType GetTxType() const override { return TXTYPE_RESTART_MEDIATOR; } - - bool Execute(TTransactionContext &txc, const TActorContext &ctx) override { - TMediator &mediator = Self->Mediator(MediatorId, ctx); - if (mediator.GenCookie != GenCookie) - return true; - - THashMap<TTxId,TVector<TTabletId>> pushToAffectedBuffer; - TVector<TAutoPtr<TMediatorStep>> mediatorSteps; - - if (!Self->RestoreMediatorInfo(MediatorId, mediatorSteps, txc, pushToAffectedBuffer)) - return false; - - for (const auto& it : pushToAffectedBuffer) { - TTransaction& transaction = Self->Transactions[it.first]; - THashSet<TTabletId>& unconfirmedAffectedSet = transaction.UnconfirmedAffectedSet[MediatorId]; - Y_VERIFY(unconfirmedAffectedSet.size() == it.second.size(), - "Incosistent affected set in mem in DB for txId %" PRIu64, it.first); - for (const TTabletId affectedTabletId : it.second) { - Y_VERIFY(unconfirmedAffectedSet.contains(affectedTabletId), - "Incosistent affected set in mem in DB for txId %" PRIu64 " missing tabletId %" PRIu64, - it.first, affectedTabletId); - } - } - - for (const auto &mp : mediatorSteps) { - StepsToConfirm.push_back(&mp->Confirmed); - mediator.Queue->Push(mp.Release()); - } - - mediator.PushUpdates = true; - return true; - } - - void Complete(const TActorContext &ctx) override { - TMediator &mediator = Self->Mediator(MediatorId, ctx); - if (mediator.GenCookie != GenCookie) - return; - - for (bool *x : StepsToConfirm) - *x = true; - - Self->SendMediatorStep(mediator, ctx); - } -}; - -ITransaction* TTxCoordinator::CreateTxRestartMediatorQueue(TTabletId mediatorId, ui64 genCookie) { - return new TTxRestartMediatorQueue(mediatorId, genCookie, this); -} - -bool TTxCoordinator::RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const { - NIceDb::TNiceDb db(txc.DB); - pushToAffected.clear(); - planned.clear(); - - auto rowset = db.Table<Schema::AffectedSet>().Range(mediatorId).Select(); - if (!rowset.IsReady()) - return false; - - // Later we will need this to be sorted by stepId - TMap<TStepId, TAutoPtr<TMediatorStep>> mediatorSteps; - - while (!rowset.EndOfSet()) { - const TTxId txId = rowset.GetValue<Schema::AffectedSet::TransactionID>(); - auto itTransaction = Transactions.find(txId); - Y_VERIFY(itTransaction != Transactions.end()); - - TStepId step = itTransaction->second.PlanOnStep; - auto itStep = mediatorSteps.find(step); - if (itStep == mediatorSteps.end()) { - itStep = mediatorSteps.insert(std::make_pair(step, new TMediatorStep(mediatorId, step))).first; - } - TAutoPtr<TMediatorStep>& mediatorStep = itStep->second; - - if (mediatorStep->Transactions.empty() || mediatorStep->Transactions.back().TxId != txId) - mediatorStep->Transactions.push_back(TMediatorStep::TTx(txId)); - TMediatorStep::TTx &tx = mediatorStep->Transactions.back(); - - TTabletId tablet = rowset.GetValue<Schema::AffectedSet::DataShardID>(); - pushToAffected[txId].push_back(tablet); - tx.PushToAffected.push_back(tablet); - tx.Moderator = 0; - if (!rowset.Next()) - return false; - } - - for (auto& pr : mediatorSteps) - planned.push_back(pr.second); - - return true; -} - -} -} diff --git a/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp b/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp index 53117e2a57..97d428d77e 100644 --- a/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp +++ b/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp @@ -13,6 +13,52 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord : TBase(coordinator) {} + struct TMediatorState { + TMediatorStepList Steps; + THashMap<TStepId, TMediatorStepList::iterator> Index; + }; + + THashMap<TTabletId, TMediatorState> Mediators; + + TMediatorStep& GetMediatorStep(TTabletId mediatorId, TStepId step) { + auto& state = Mediators[mediatorId]; + + auto it = state.Index.find(step); + if (it != state.Index.end()) { + return *it->second; + } + + auto& entry = state.Steps.emplace_back(mediatorId, step); + state.Index[step] = --state.Steps.end(); + return entry; + } + + TMediatorStep::TTx& GetMediatorTx(TTabletId mediatorId, TStepId step, TTxId txId) { + auto& medStep = GetMediatorStep(mediatorId, step); + if (medStep.Transactions.empty() || medStep.Transactions.back().TxId < txId) { + return medStep.Transactions.emplace_back(txId); + } + auto& medTx = medStep.Transactions.back(); + Y_VERIFY_S(medTx.TxId == txId, "Transaction loading must be ordered by TxId:" + << " Mediator# " << mediatorId + << " step# " << step + << " TxId# " << txId + << " PrevTxId# " << medTx.TxId); + return medTx; + } + + void EnsureLastMediatorStep(TTabletId mediatorId, TStepId step) { + auto& state = Mediators[mediatorId]; + if (!state.Steps.empty()) { + auto it = --state.Steps.end(); + if (step <= it->Step) { + return; // nothing to do + } + } + state.Steps.emplace_back(mediatorId, step); + state.Index[step] = --state.Steps.end(); + } + bool Restore(TTransactions &transactions, TTransactionContext &txc, const TActorContext &ctx) { Y_UNUSED(ctx); NIceDb::TNiceDb db(txc.DB); @@ -35,6 +81,8 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord } } + Mediators.clear(); + { int errors = 0; auto rowset = db.Table<Schema::AffectedSet>().Range().Select(); @@ -48,7 +96,10 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord auto itTransaction = transactions.find(txId); if (itTransaction != transactions.end()) { - itTransaction->second.UnconfirmedAffectedSet[medId].insert(affectedShardId); + auto& tx = itTransaction->second; + tx.UnconfirmedAffectedSet[medId].insert(affectedShardId); + auto& medTx = GetMediatorTx(medId, tx.PlanOnStep, txId); + medTx.PushToAffected.push_back(affectedShardId); } else { LOG_ERROR_S(ctx, NKikimrServices::TX_COORDINATOR, "Transaction not found: MedId = " << medId << " TxId = " << txId << " DataShardId = " << affectedShardId); ++errors; @@ -85,14 +136,36 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord Self->Transactions.swap(transactions); *Self->MonCounters.TxInFly += txCounter; Self->MonCounters.CurrentTxInFly = txCounter; + + // Prepare mediator queues + for (ui64 mediatorId : Self->Config.Mediators->List()) { + auto& state = Mediators[mediatorId]; + // We need to slice steps in a sorted order + state.Steps.sort([](const TMediatorStep& a, const TMediatorStep& b) -> bool { + return a.Step < b.Step; + }); + // Make sure each mediator will receive last planned step + if (Self->VolatileState.LastPlanned != 0) { + EnsureLastMediatorStep(mediatorId, Self->VolatileState.LastPlanned); + } + // Splice all steps to the queue + TMediator& mediator = Self->Mediator(mediatorId, ctx); + Y_VERIFY(mediator.Queue.empty()); + mediator.Queue.splice(mediator.Queue.end(), state.Steps); + } return true; } void Complete(const TActorContext &ctx) override { - // start mediator queues + // Send steps to connected queues for (ui64 mediatorId: Self->Config.Mediators->List()) { TMediator &mediator = Self->Mediator(mediatorId, ctx); - Y_UNUSED(mediator); + auto& state = Mediators[mediatorId]; + for (auto& pr : state.Index) { + Y_VERIFY(!pr.second->Confirmed); + pr.second->Confirmed = true; + } + Self->SendMediatorStep(mediator, ctx); } // start plan process. diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp index 1edc81a258..0af758a563 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.cpp +++ b/ydb/core/tx/coordinator/coordinator_impl.cpp @@ -26,7 +26,7 @@ static void SendTransactionStatus(const TActorId &proxy, TEvTxProxy::TEvProposeT ctx.Send(proxy, new TEvTxProxy::TEvProposeTransactionStatus(status, txid, stepId)); } -static TAutoPtr<TTransactionProposal> MakeTransactionProposal(TEvTxProxy::TEvProposeTransaction::TPtr &ev, ::NMonitoring::TDynamicCounters::TCounterPtr &counter) { +static TTransactionProposal MakeTransactionProposal(TEvTxProxy::TEvProposeTransaction::TPtr &ev, ::NMonitoring::TDynamicCounters::TCounterPtr &counter) { const TActorId &sender = ev->Sender; const NKikimrTx::TEvProposeTransaction &record = ev->Get()->Record; @@ -36,11 +36,11 @@ static TAutoPtr<TTransactionProposal> MakeTransactionProposal(TEvTxProxy::TEvPro const ui64 maxStep = txrec.HasMaxStep() ? txrec.GetMaxStep() : Max<ui64>(); const bool ignoreLowDiskSpace = txrec.GetIgnoreLowDiskSpace(); - TAutoPtr<TTransactionProposal> proposal(new TTransactionProposal(sender, txId, minStep, maxStep, ignoreLowDiskSpace)); - proposal->AffectedSet.resize(txrec.AffectedSetSize()); + TTransactionProposal proposal(sender, txId, minStep, maxStep, ignoreLowDiskSpace); + proposal.AffectedSet.resize(txrec.AffectedSetSize()); for (ui32 i = 0, e = txrec.AffectedSetSize(); i != e; ++i) { const auto &x = txrec.GetAffectedSet(i); - auto &s = proposal->AffectedSet[i]; + auto &s = proposal.AffectedSet[i]; s.TabletId = x.GetTabletId(); Y_ASSERT(x.GetFlags() > 0 && x.GetFlags() <= 3); @@ -77,32 +77,32 @@ TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet) TabletCounters = TabletCountersPtr.Get(); } -void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TActorContext &ctx) { - proposal->AcceptMoment = ctx.Now(); +void TTxCoordinator::PlanTx(TTransactionProposal &&proposal, const TActorContext &ctx) { + proposal.AcceptMoment = ctx.Now(); MonCounters.PlanTxCalls->Inc(); - if (proposal->MaxStep <= VolatileState.LastPlanned) { + if (proposal.MaxStep <= VolatileState.LastPlanned) { MonCounters.PlanTxOutdated->Inc(); - return SendTransactionStatus(proposal->Proxy + return SendTransactionStatus(proposal.Proxy , TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated - , proposal->TxId, 0, ctx, TabletID()); + , proposal.TxId, 0, ctx, TabletID()); } if (Stopping) { - return SendTransactionStatus(proposal->Proxy, + return SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting, - proposal->TxId, 0, ctx, TabletID()); + proposal.TxId, 0, ctx, TabletID()); } - bool forRapidExecution = (proposal->MinStep <= VolatileState.LastPlanned); + bool forRapidExecution = (proposal.MinStep <= VolatileState.LastPlanned); ui64 planStep = 0; // cycle for flow control - for (ui64 cstep = (proposal->MinStep + Config.Resolution - 1) / Config.Resolution * Config.Resolution; /*no-op*/;cstep += Config.Resolution) { - if (cstep >= proposal->MaxStep) { + for (ui64 cstep = (proposal.MinStep + Config.Resolution - 1) / Config.Resolution * Config.Resolution; /*no-op*/;cstep += Config.Resolution) { + if (cstep >= proposal.MaxStep) { MonCounters.PlanTxOutdated->Inc(); - return SendTransactionStatus(proposal->Proxy, - TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, proposal->TxId, 0, ctx, TabletID()); + return SendTransactionStatus(proposal.Proxy, + TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, proposal.TxId, 0, ctx, TabletID()); } if (forRapidExecution) { @@ -117,15 +117,14 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct } MonCounters.PlanTxAccepted->Inc(); - SendTransactionStatus(proposal->Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted, - proposal->TxId, planStep, ctx, TabletID()); + SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted, + proposal.TxId, planStep, ctx, TabletID()); if (forRapidExecution) { TQueueType::TSlot &rapidSlot = VolatileState.Queue.RapidSlot; - rapidSlot.Queue->Push(proposal.Release()); - ++rapidSlot.QueueSize; + rapidSlot.push_back(std::move(proposal)); - if (rapidSlot.QueueSize >= Config.RapidSlotFlushSize) { + if (rapidSlot.size() >= Config.RapidSlotFlushSize) { SchedulePlanTickExact(planStep); } @@ -133,8 +132,7 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct SchedulePlanTickAligned(planStep); } else { TQueueType::TSlot &planSlot = VolatileState.Queue.LowSlot(planStep); - planSlot.Queue->Push(proposal.Release()); - ++planSlot.QueueSize; + planSlot.push_back(std::move(proposal)); // We may be sleeping at reduced resolution, try to wake up sooner SchedulePlanTickExact(planStep); @@ -142,29 +140,29 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct } void TTxCoordinator::Handle(TEvTxProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { - TAutoPtr<TTransactionProposal> proposal = MakeTransactionProposal(ev, MonCounters.TxIn); - LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal->TxId + TTransactionProposal proposal = MakeTransactionProposal(ev, MonCounters.TxIn); + LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal.TxId << " HANDLE EvProposeTransaction marker# C0"); - PlanTx(proposal, ctx); + PlanTx(std::move(proposal), ctx); } void TTxCoordinator::HandleEnqueue(TEvTxProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { TryInitMonCounters(ctx); - TAutoPtr<TTransactionProposal> proposal = MakeTransactionProposal(ev, MonCounters.TxIn); - LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal->TxId + TTransactionProposal proposal = MakeTransactionProposal(ev, MonCounters.TxIn); + LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal.TxId << " HANDLE Enqueue EvProposeTransaction"); if (Y_UNLIKELY(Stopping)) { - return SendTransactionStatus(proposal->Proxy, + return SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting, - proposal->TxId, 0, ctx, TabletID()); + proposal.TxId, 0, ctx, TabletID()); } if (!VolatileState.Queue.Unsorted) - VolatileState.Queue.Unsorted.Reset(new TQueueType::TQ()); + VolatileState.Queue.Unsorted.emplace(); - VolatileState.Queue.Unsorted->Push(proposal.Release()); + VolatileState.Queue.Unsorted->push_back(std::move(proposal)); } bool TTxCoordinator::AllowReducedPlanResolution() const { @@ -281,9 +279,12 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte } if (VolatileState.Queue.Unsorted) { - while (TAutoPtr<TTransactionProposal> x = VolatileState.Queue.Unsorted->Pop()) - PlanTx(x, ctx); - VolatileState.Queue.Unsorted.Destroy(); + while (!VolatileState.Queue.Unsorted->empty()) { + auto& proposal = VolatileState.Queue.Unsorted->front(); + PlanTx(std::move(proposal), ctx); + VolatileState.Queue.Unsorted->pop_front(); + } + VolatileState.Queue.Unsorted.reset(); } const ui64 resolution = Config.Resolution; @@ -307,14 +308,11 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte return SchedulePlanTick(); } - TVector<TQueueType::TSlot> slots; + std::deque<TQueueType::TSlot> slots; - if (VolatileState.Queue.RapidSlot.QueueSize) { - if (slots.empty()) { - slots.reserve(1000); - } - slots.push_back(VolatileState.Queue.RapidSlot); - VolatileState.Queue.RapidSlot = TQueueType::TSlot(); + if (!VolatileState.Queue.RapidSlot.empty()) { + slots.push_back(std::move(VolatileState.Queue.RapidSlot)); + VolatileState.Queue.RapidSlot.clear(); } while (!VolatileState.Queue.Low.empty()) { @@ -322,11 +320,8 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte if (frontIt->first > next) break; - if (frontIt->second.QueueSize) { - if (slots.empty()) { - slots.reserve(1000); - } - slots.push_back(frontIt->second); + if (!frontIt->second.empty()) { + slots.push_back(std::move(frontIt->second)); } VolatileState.Queue.Low.erase(frontIt); @@ -338,46 +333,43 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte VolatileState.LastPlanned = next; NotifyUpdatedLastStep(); - Execute(CreateTxPlanStep(next, slots), ctx); + Execute(CreateTxPlanStep(next, std::move(slots)), ctx); SchedulePlanTick(); } -void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx) { - TEvTxCoordinator::TEvMediatorQueueConfirmations *msg = ev->Get(); +void TTxCoordinator::Handle(TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx) { + TEvMediatorQueueConfirmations *msg = ev->Get(); LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE EvMediatorQueueConfirmations MediatorId# " << msg->Confirmations->MediatorId); - Execute(CreateTxMediatorConfirmations(msg->Confirmations), ctx); + Execute(CreateTxMediatorConfirmations(std::move(msg->Confirmations)), ctx); } -void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx) { - const TEvTxCoordinator::TEvMediatorQueueStop *msg = ev->Get(); +void TTxCoordinator::Handle(TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx) { + const TEvMediatorQueueStop *msg = ev->Get(); LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE EvMediatorQueueStop MediatorId# " << msg->MediatorId); TMediator &mediator = Mediator(msg->MediatorId, ctx); - mediator.PushUpdates = false; - mediator.GenCookie = Max<ui64>(); - mediator.Queue.Destroy(); + mediator.Active = false; } -void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx) { - const TEvTxCoordinator::TEvMediatorQueueRestart *msg = ev->Get(); +void TTxCoordinator::Handle(TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx) { + const TEvMediatorQueueRestart *msg = ev->Get(); LOG_NOTICE_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE EvMediatorQueueRestart MediatorId# " << msg->MediatorId); TMediator &mediator = Mediator(msg->MediatorId, ctx); - mediator.PushUpdates = false; - mediator.GenCookie = msg->GenCookie; - mediator.Queue.Reset(new TMediator::TStepQueue()); - Execute(CreateTxRestartMediatorQueue(msg->MediatorId, msg->GenCookie), ctx); + mediator.Active = true; + SendMediatorStep(mediator, ctx); } -void TTxCoordinator::Handle(TEvTxCoordinator::TEvCoordinatorConfirmPlan::TPtr &ev, const TActorContext &ctx) { - TAutoPtr<TCoordinatorStepConfirmations> confirmations = ev->Get()->Confirmations; - while (TAutoPtr<TCoordinatorStepConfirmations::TEntry> x = confirmations->Queue->Pop()) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << x->TxId - << " stepId# " << x->Step << " Status# " << x->Status - << " SEND EvProposeTransactionStatus to# " << x->ProxyId.ToString() << " Proxy"); - ctx.Send(x->ProxyId, new TEvTxProxy::TEvProposeTransactionStatus(x->Status, x->TxId, x->Step)); +void TTxCoordinator::SendStepConfirmations(TCoordinatorStepConfirmations &confirmations, const TActorContext &ctx) { + while (!confirmations.Queue.empty()) { + auto &x = confirmations.Queue.front(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << x.TxId + << " stepId# " << x.Step << " Status# " << x.Status + << " SEND EvProposeTransactionStatus to# " << x.ProxyId.ToString() << " Proxy"); + ctx.Send(x.ProxyId, new TEvTxProxy::TEvProposeTransactionStatus(x.Status, x.TxId, x.Step)); + confirmations.Queue.pop_front(); } } @@ -503,24 +495,39 @@ void TTxCoordinator::TryInitMonCounters(const TActorContext &ctx) { } void TTxCoordinator::SendMediatorStep(TMediator &mediator, const TActorContext &ctx) { - while (TMediatorStep *step = mediator.Queue->Head()) { - if (!step->Confirmed) - return; + if (!mediator.Active) { + // We don't want to update LastSentStep when mediators are not empty + return; + } - TAutoPtr<TMediatorStep> extracted = mediator.Queue->Pop(); - for (const auto& tx: extracted->Transactions) { + std::unique_ptr<TEvMediatorQueueStep> msg; + while (!mediator.Queue.empty()) { + auto it = mediator.Queue.begin(); + if (!it->Confirmed) { + break; + } + + for (const auto& tx: it->Transactions) { LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "Send from# " << TabletID() - << " to mediator# " << extracted->MediatorId << ", step# " << extracted->Step + << " to mediator# " << it->MediatorId << ", step# " << it->Step << ", txid# " << tx.TxId << " marker# C2"); } - if (VolatileState.LastSentStep < extracted->Step) { - VolatileState.LastSentStep = extracted->Step; + if (VolatileState.LastSentStep < it->Step) { + VolatileState.LastSentStep = it->Step; if (ReadStepSubscriptionManager) { ctx.Send(ReadStepSubscriptionManager, new TEvPrivate::TEvReadStepUpdated(VolatileState.LastSentStep)); } } - ctx.Send(mediator.QueueActor, new TEvTxCoordinator::TEvMediatorQueueStep(mediator.GenCookie, extracted)); + + if (!msg) { + msg.reset(new TEvMediatorQueueStep()); + } + msg->SpliceStep(mediator.Queue, it); + } + + if (msg) { + ctx.Send(mediator.QueueActor, msg.release()); } } @@ -565,22 +572,24 @@ void TTxCoordinator::OnTabletStop(TEvTablet::TEvTabletStop::TPtr &ev, const TAct void TTxCoordinator::OnStopGuardStarting(const TActorContext &ctx) { auto processQueue = [&](auto &queue) { - while (TAutoPtr<TTransactionProposal> proposal = queue.Pop()) { - SendTransactionStatus(proposal->Proxy, + while (!queue.empty()) { + auto& proposal = queue.front(); + SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting, - proposal->TxId, 0, ctx, TabletID()); + proposal.TxId, 0, ctx, TabletID()); + queue.pop_front(); } }; if (VolatileState.Queue.Unsorted) { processQueue(*VolatileState.Queue.Unsorted); - VolatileState.Queue.Unsorted.Destroy(); + VolatileState.Queue.Unsorted.reset(); } - processQueue(*VolatileState.Queue.RapidSlot.Queue); + processQueue(VolatileState.Queue.RapidSlot); for (auto &kv : VolatileState.Queue.Low) { - processQueue(*kv.second.Queue); + processQueue(kv.second); } VolatileState.Queue.Low.clear(); } diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h index cd8968308f..e43e0026dc 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.h +++ b/ydb/core/tx/coordinator/coordinator_impl.h @@ -15,7 +15,6 @@ #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/scheme/scheme_types_defs.h> #include <ydb/core/tx/tx.h> -#include <ydb/core/util/queue_oneone_inplace.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -80,15 +79,7 @@ struct TCoordinatorStepConfirmations { {} }; - using TQ = TOneOneQueueInplace<TEntry *, 128>; - - const TAutoPtr<TQ, TQ::TPtrCleanDestructor> Queue; - const TStepId Step; - - TCoordinatorStepConfirmations(TStepId step) - : Queue(new TQ()) - , Step(step) - {} + std::deque<TEntry> Queue; }; struct TMediatorStep { @@ -100,7 +91,7 @@ struct TMediatorStep { ui64 Moderator; - TTx(TTxId txId, TTabletId *affected, ui32 affectedSize, ui64 moderator) + TTx(TTxId txId, const TTabletId *affected, ui32 affectedSize, ui64 moderator) : TxId(txId) , PushToAffected(affected, affected + affectedSize) , Moderator(moderator) @@ -110,16 +101,21 @@ struct TMediatorStep { TTx(TTxId txId) : TxId(txId) + , Moderator(0) { Y_VERIFY(TxId != 0); } }; - const TTabletId MediatorId; - const TStepId Step; + TTabletId MediatorId; + TStepId Step; bool Confirmed; + TVector<TTx> Transactions; + // Used by mediator queue to track acks + size_t References = 0; + TMediatorStep(TTabletId mediatorId, TStepId step) : MediatorId(mediatorId) , Step(step) @@ -127,6 +123,8 @@ struct TMediatorStep { {} }; +using TMediatorStepList = std::list<TMediatorStep>; + struct TMediatorConfirmations { const TTabletId MediatorId; @@ -137,6 +135,45 @@ struct TMediatorConfirmations { {} }; +struct TEvMediatorQueueStep : public TEventLocal<TEvMediatorQueueStep, TEvTxCoordinator::EvMediatorQueueStep> { + TMediatorStepList Steps; + + TEvMediatorQueueStep() = default; + + void SpliceStep(TMediatorStepList& list, TMediatorStepList::iterator it) { + Steps.splice(Steps.end(), list, it); + } +}; + +struct TEvMediatorQueueRestart : public TEventLocal<TEvMediatorQueueRestart, TEvTxCoordinator::EvMediatorQueueRestart> { + const ui64 MediatorId; + const ui64 StartFrom; + const ui64 GenCookie; + + TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie) + : MediatorId(mediatorId) + , StartFrom(startFrom) + , GenCookie(genCookie) + {} +}; + +struct TEvMediatorQueueStop : public TEventLocal<TEvMediatorQueueStop, TEvTxCoordinator::EvMediatorQueueStop> { + const ui64 MediatorId; + + TEvMediatorQueueStop(ui64 mediatorId) + : MediatorId(mediatorId) + {} +}; + +struct TEvMediatorQueueConfirmations : public TEventLocal<TEvMediatorQueueConfirmations, TEvTxCoordinator::EvMediatorQueueConfirmations> { + std::unique_ptr<NFlatTxCoordinator::TMediatorConfirmations> Confirmations; + + TEvMediatorQueueConfirmations(std::unique_ptr<NFlatTxCoordinator::TMediatorConfirmations> &&confirmations) + : Confirmations(std::move(confirmations)) + {} +}; + + IActor* CreateTxCoordinatorMediatorQueue(const TActorId &owner, ui64 coordinator, ui64 mediator, ui64 coordinatorGeneration); using NTabletFlatExecutor::TTabletExecutedFlat; @@ -253,8 +290,6 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat }; struct TQueueType { - typedef TOneOneQueueInplace<TTransactionProposal *, 512> TQ; - struct TFlowEntry { ui16 Weight; ui16 Limit; @@ -265,14 +300,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat {} }; - struct TSlot { - TAutoPtr<TQ, TQ::TPtrCleanDestructor> Queue; - ui64 QueueSize; + using TQueue = std::deque<TTransactionProposal>; - TSlot() - : Queue(new TQ()) - , QueueSize(0) - {} + struct TSlot : public TQueue { + using TQueue::TQueue; }; typedef TMap<TStepId, TSlot> TSlotQueue; @@ -280,15 +311,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat TSlotQueue Low; TSlot RapidSlot; // slot for entries with schedule on 'right now' moment (actually - with min-schedule time in past). - TAutoPtr<TQ, TQ::TPtrCleanDestructor> Unsorted; + std::optional<TSlot> Unsorted; TSlot& LowSlot(TStepId step) { - TMap<TStepId, TSlot>::iterator it = Low.find(step); - if (it != Low.end()) - return it->second; - std::pair<TMap<TStepId, TSlot>::iterator, bool> xit = Low.insert(TSlotQueue::value_type(step, TSlot())); - TSlot &ret = xit.first->second; - return ret; + return Low[step]; } TStepId MinLowSlot() const { @@ -308,7 +334,6 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat struct TTxSchema; struct TTxUpgrade; struct TTxPlanStep; - struct TTxRestartMediatorQueue; struct TTxMediatorConfirmations; struct TTxConsistencyCheck; struct TTxMonitoring; @@ -327,9 +352,9 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat const NKikimrSubDomains::TProcessingParams &config); ITransaction* CreateTxSchema(); ITransaction* CreateTxUpgrade(); - ITransaction* CreateTxPlanStep(TStepId toStep, TVector<TQueueType::TSlot> &slots); + ITransaction* CreateTxPlanStep(TStepId toStep, std::deque<TQueueType::TSlot> &&slots); ITransaction* CreateTxRestartMediatorQueue(TTabletId mediatorId, ui64 genCookie); - ITransaction* CreateTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations); + ITransaction* CreateTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations); ITransaction* CreateTxConsistencyCheck(); ITransaction* CreateTxMonitoring(NMon::TEvRemoteHttpInfo::TPtr& ev); ITransaction* CreateTxStopGuard(); @@ -349,18 +374,9 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat }; struct TMediator { - typedef TOneOneQueueInplace<TMediatorStep *, 32> TStepQueue; - TActorId QueueActor; - ui64 GenCookie; - bool PushUpdates; - - TAutoPtr<TStepQueue, TStepQueue::TPtrCleanDestructor> Queue; - - TMediator() - : GenCookie(0) - , PushUpdates(false) - {} + TMediatorStepList Queue; + bool Active = false; }; struct TTransaction { @@ -653,27 +669,26 @@ private: void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev); void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx); - void Handle(TEvTxCoordinator::TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx); - void Handle(TEvTxCoordinator::TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx); - void Handle(TEvTxCoordinator::TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx); - void Handle(TEvTxCoordinator::TEvCoordinatorConfirmPlan::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx); void Handle(TEvSubDomain::TEvConfigure::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx); + void SendStepConfirmations(TCoordinatorStepConfirmations &confirmations, const TActorContext &ctx); void DoConfiguration(const TEvSubDomain::TEvConfigure &ev, const TActorContext &ctx, const TActorId &ackTo = TActorId()); void Sync(ui64 mediator, const TActorContext &ctx); void Sync(const TActorContext &ctx); - void PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TActorContext &ctx); + void PlanTx(TTransactionProposal &&proposal, const TActorContext &ctx); bool AllowReducedPlanResolution() const; void SchedulePlanTick(); void SchedulePlanTickExact(ui64 next); void SchedulePlanTickAligned(ui64 next); - bool RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const; void TryInitMonCounters(const TActorContext &ctx); bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override; @@ -728,10 +743,9 @@ public: hFunc(TEvTxProxy::TEvSubscribeLastStep, Handle); hFunc(TEvTxProxy::TEvUnsubscribeLastStep, Handle); HFunc(TEvPrivate::TEvPlanTick, Handle); - HFunc(TEvTxCoordinator::TEvMediatorQueueConfirmations, Handle); - HFunc(TEvTxCoordinator::TEvMediatorQueueRestart, Handle); - HFunc(TEvTxCoordinator::TEvMediatorQueueStop, Handle); - HFunc(TEvTxCoordinator::TEvCoordinatorConfirmPlan, Handle); + HFunc(TEvMediatorQueueConfirmations, Handle); + HFunc(TEvMediatorQueueRestart, Handle); + HFunc(TEvMediatorQueueStop, Handle); HFunc(TEvTabletPipe::TEvServerConnected, Handle); HFunc(TEvTabletPipe::TEvServerDisconnected, Handle); HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle); diff --git a/ydb/core/tx/coordinator/coordinator_ut.cpp b/ydb/core/tx/coordinator/coordinator_ut.cpp index d7c673a5dc..7140b98a1d 100644 --- a/ydb/core/tx/coordinator/coordinator_ut.cpp +++ b/ydb/core/tx/coordinator/coordinator_ut.cpp @@ -32,9 +32,9 @@ namespace NKikimr::NFlatTxCoordinator::NTest { ui64 lastMediatorStep = 0; auto observeMediatorSteps = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { switch (ev->GetTypeRewrite()) { - case TEvTxCoordinator::TEvMediatorQueueStep::EventType: { - auto* msg = ev->Get<TEvTxCoordinator::TEvMediatorQueueStep>(); - ui64 step = msg->Step->Step; + case TEvMediatorQueueStep::EventType: { + auto* msg = ev->Get<TEvMediatorQueueStep>(); + ui64 step = msg->Steps.back().Step; lastMediatorStep = Max(lastMediatorStep, step); break; } @@ -476,6 +476,59 @@ namespace NKikimr::NFlatTxCoordinator::NTest { UNIT_ASSERT_C(hooks.PersistConfig_.empty(), "Unexpected persist attempt after a second reboot"); } + Y_UNIT_TEST(LastEmptyStepResent) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(1) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + + auto &runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG); + + auto sender = runtime.AllocateEdgeActor(); + ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain); + runtime.SimulateSleep(TDuration::Seconds(1)); + + std::vector<ui64> emptySteps; + auto stepsObserver = [&](auto&, auto& ev) { + switch (ev->GetTypeRewrite()) { + case TEvTxCoordinator::TEvCoordinatorStep::EventType: { + auto* msg = ev->template Get<TEvTxCoordinator::TEvCoordinatorStep>(); + ui64 step = msg->Record.GetStep(); + bool empty = msg->Record.TransactionsSize() == 0; + Cerr << "... observed " << step << ": " << (empty ? "empty" : "not empty") << Endl; + if (empty) { + emptySteps.push_back(step); + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(stepsObserver); + + runtime.SimulateSleep(TDuration::Seconds(10)); + UNIT_ASSERT_C(emptySteps.size() > 1, "Expected multiple empty steps, not " << emptySteps.size()); + ui64 lastObserved = emptySteps.back(); + emptySteps.clear(); + + RebootTablet(runtime, mediatorId, sender); + if (emptySteps.empty()) { + Cerr << "... waiting for empty steps" << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return !emptySteps.empty(); + }; + runtime.DispatchEvents(options); + } + UNIT_ASSERT_C(!emptySteps.empty(), "Expected at least one empty step"); + UNIT_ASSERT_C(emptySteps.front() == lastObserved, + "Expected to observe " << lastObserved << " empty step, not " << emptySteps.front()); + } + } // Y_UNIT_TEST_SUITE(Coordinator) diff --git a/ydb/core/tx/coordinator/mediator_queue.cpp b/ydb/core/tx/coordinator/mediator_queue.cpp index 9b3955fa1b..4195c1cbe8 100644 --- a/ydb/core/tx/coordinator/mediator_queue.cpp +++ b/ydb/core/tx/coordinator/mediator_queue.cpp @@ -5,6 +5,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h> namespace NKikimr { namespace NFlatTxCoordinator { @@ -17,11 +18,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi const ui64 CoordinatorGeneration; TActorId PipeClient; - ui64 GenCookie; - ui64 PrevStep; + ui64 GenCookie = 0; + ui64 PrevStep = 0; + bool Active = false; - TAutoPtr<TMediatorConfirmations> Confirmations; + TMediatorStepList Queue; + THashMap<std::pair<ui64, ui64>, TMediatorStepList::iterator> WaitingAcks; + + std::unique_ptr<TMediatorConfirmations> Confirmations; void Die(const TActorContext &ctx) override { if (PipeClient) { @@ -45,6 +50,57 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi << " tablet# " << Coordinator << " SEND EvCoordinatorSync to# " << Mediator << " Mediator"); NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorSync(++GenCookie, Mediator, Coordinator)); Become(&TThis::StateSync); + Active = false; + } + + void SendStep(const TMediatorStep &step, const TActorContext &ctx) { + LOG_DEBUG(ctx, NKikimrServices::TX_COORDINATOR_PRIVATE, "[%" PRIu64 "] to [%" PRIu64 "], step [%" PRIu64 "]", Coordinator, Mediator, step.Step); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() + << " tablet# " << Coordinator << " SEND to# " << Mediator << " Mediator TEvCoordinatorStep"); + NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorStep( + step, PrevStep, Mediator, Coordinator, CoordinatorGeneration)); + PrevStep = step.Step; + } + + void SendConfirmations(const TActorContext &ctx) { + if (Confirmations) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() + << " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString() + << " Owner"); + ctx.Send(Owner, new TEvMediatorQueueConfirmations(std::move(Confirmations))); + } + } + + /** + * Filters step by removing txs/shards that have already been acknowledged + */ + void FilterAcked(TMediatorStep &step) { + auto end = std::remove_if( + step.Transactions.begin(), + step.Transactions.end(), + [this](TMediatorStep::TTx& tx) -> bool { + ui64 txId = tx.TxId; + auto end = std::remove_if( + tx.PushToAffected.begin(), + tx.PushToAffected.end(), + [this, txId](ui64 shardId) -> bool { + // Remove shards that have been acknowledged (not in a waiting set) + return WaitingAcks.find(std::make_pair(txId, shardId)) == WaitingAcks.end(); + }); + if (end != tx.PushToAffected.end()) { + tx.PushToAffected.erase(end, tx.PushToAffected.end()); + } + // Remove transactions without any shards left + return tx.PushToAffected.empty(); + }); + if (end != step.Transactions.end()) { + step.Transactions.erase(end, step.Transactions.end()); + // Note: fully acked steps are removed immediately + if (Y_UNLIKELY(step.Transactions.empty())) { + step.Transactions.shrink_to_fit(); + } + } } void Handle(TEvTxCoordinator::TEvCoordinatorSyncResult::TPtr &ev, const TActorContext &ctx) { @@ -62,34 +118,55 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi PrevStep = 0; LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() << " tablet# " << Coordinator << " SEND EvMediatorQueueRestart to# " << Owner.ToString() << " Owner"); - ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueRestart(Mediator, 0, ++GenCookie)); + ctx.Send(Owner, new TEvMediatorQueueRestart(Mediator, 0, ++GenCookie)); Become(&TThis::StateWork); - } - - void Handle(TEvTxCoordinator::TEvMediatorQueueStep::TPtr &ev, const TActorContext &ctx) { - TEvTxCoordinator::TEvMediatorQueueStep *msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() - << " HANDLE EvMediatorQueueStep step# " << msg->Step->Step); - if (msg->GenCookie == GenCookie && PipeClient) { - const NFlatTxCoordinator::TMediatorStep &step = *msg->Step; + Active = true; + for (auto& step : Queue) { + FilterAcked(step); + // TODO: limit the number of steps inflight + SendStep(step, ctx); + } + } - LOG_DEBUG(ctx, NKikimrServices::TX_COORDINATOR_PRIVATE, "[%" PRIu64 "] to [%" PRIu64 "], step [%" PRIu64 "]", Coordinator, Mediator, step.Step); + void Handle(TEvMediatorQueueStep::TPtr &ev, const TActorContext &ctx) { + TEvMediatorQueueStep *msg = ev->Get(); + while (!msg->Steps.empty()) { + auto it = msg->Steps.begin(); LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() - << " tablet# " << Coordinator << " SEND to# " << Mediator << " Mediator TEvCoordinatorStep"); - NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorStep( - step, PrevStep, Mediator, Coordinator, CoordinatorGeneration)); - PrevStep = step.Step; + << " HANDLE EvMediatorQueueStep step# " << it->Step); + + // Remove the last empty step (empty steps except the last one are not needed) + if (!Queue.empty() && Queue.back().Transactions.empty()) { + Queue.pop_back(); + } + + Queue.splice(Queue.end(), msg->Steps, it); + + // Index by TxId/ShardId pairs while waiting for acks + Y_VERIFY(it->References == 0); + for (auto& tx : it->Transactions) { + for (ui64 shardId : tx.PushToAffected) { + auto res = WaitingAcks.insert( + std::make_pair( + std::make_pair(tx.TxId, shardId), + it)); + Y_VERIFY_DEBUG(res.second); + if (res.second) { + it->References++; + } + } + } + + if (Active) { + // TODO: limit the number of steps inflight + SendStep(*it, ctx); + } } - if (Confirmations) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() - << " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString() - << " Owner"); - ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueConfirmations(Confirmations)); - } + SendConfirmations(ctx); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) { @@ -112,10 +189,11 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi << " HANDLE EvClientDestroyed"); if (msg->ClientId != PipeClient) return; + Active = false; LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() << " tablet# " << Coordinator << " SEND EvMediatorQueueStop to# " << Owner.ToString() << " Owner Mediator# " << Mediator); - ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueStop(Mediator)); + ctx.Send(Owner, new TEvMediatorQueueStop(Mediator)); Sync(ctx); } @@ -124,12 +202,36 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString() << " HANDLE EvPlanStepAck"); - if (!Confirmations) - Confirmations.Reset(new TMediatorConfirmations(Mediator)); + bool fullyConfirmed = false; const TTabletId tabletId = record.GetTabletId(); for (const auto txid : record.GetTxId()) { - Confirmations->Acks[txid].insert(tabletId); + auto it = WaitingAcks.find(std::make_pair(txid, tabletId)); + if (it != WaitingAcks.end()) { + Y_VERIFY(it->second->References > 0); + + if (!Confirmations) + Confirmations.reset(new TMediatorConfirmations(Mediator)); + Confirmations->Acks[txid].insert(tabletId); + + if (0 == --it->second->References) { + Y_VERIFY(!Queue.empty()); + auto last = --Queue.end(); + if (it->second != last) { + Queue.erase(it->second); + } else { + // On reconnect we will resend it as empty + it->second->Transactions.clear(); + it->second->Transactions.shrink_to_fit(); + } + fullyConfirmed = true; + } + WaitingAcks.erase(it); + } + } + + if (fullyConfirmed) { + SendConfirmations(ctx); } } @@ -143,9 +245,6 @@ public: , Coordinator(coordinator) , Mediator(mediator) , CoordinatorGeneration(coordinatorGeneration) - , PipeClient() - , GenCookie(0) - , PrevStep(0) {} void Bootstrap(const TActorContext &ctx) { @@ -165,7 +264,7 @@ public: STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvTxProcessing::TEvPlanStepAck, Handle); - HFunc(TEvTxCoordinator::TEvMediatorQueueStep, Handle); + HFunc(TEvMediatorQueueStep, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); CFunc(TEvents::TSystem::PoisonPill, Die) } diff --git a/ydb/core/tx/coordinator/ya.make b/ydb/core/tx/coordinator/ya.make index efd565a3db..d5377a15af 100644 --- a/ydb/core/tx/coordinator/ya.make +++ b/ydb/core/tx/coordinator/ya.make @@ -14,7 +14,6 @@ SRCS( coordinator__plan_step.cpp coordinator__read_step_subscriptions.cpp coordinator__restore_params.cpp - coordinator__restart_mediator.cpp coordinator__restore_transaction.cpp coordinator__schema.cpp coordinator__schema_upgrade.cpp @@ -27,6 +26,7 @@ PEERDIR( library/cpp/actors/core library/cpp/actors/helpers library/cpp/actors/interconnect + library/cpp/containers/absl_flat_hash ydb/core/actorlib_impl ydb/core/base ydb/core/engine/minikql |