aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-07-25 20:04:06 +0300
committerroot <root@qavm-2ed34686.qemu>2023-07-25 20:04:06 +0300
commitd49935cc4b833d7adefe349d251a8705fbc8e6fe (patch)
treecb5686f3908e75f308614e6abdd5d1c0a0680af2
parentdd49b96fb21fbea107931e3ca4f1d9b292241969 (diff)
downloadydb-d49935cc4b833d7adefe349d251a8705fbc8e6fe.tar.gz
Use in-memory mediator queue with local resends KIKIMR-18580
-rw-r--r--ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/coordinator/coordinator.cpp23
-rw-r--r--ydb/core/tx/coordinator/coordinator.h43
-rw-r--r--ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp10
-rw-r--r--ydb/core/tx/coordinator/coordinator__plan_step.cpp107
-rw-r--r--ydb/core/tx/coordinator/coordinator__restart_mediator.cpp111
-rw-r--r--ydb/core/tx/coordinator/coordinator__restore_transaction.cpp79
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.cpp177
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.h120
-rw-r--r--ydb/core/tx/coordinator/coordinator_ut.cpp59
-rw-r--r--ydb/core/tx/coordinator/mediator_queue.cpp161
-rw-r--r--ydb/core/tx/coordinator/ya.make2
15 files changed, 489 insertions, 411 deletions
diff --git a/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt
index 6b5e5d1b2b..8b3bb6b698 100644
--- a/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/coordinator/CMakeLists.darwin-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(core-tx-coordinator PUBLIC
cpp-actors-core
cpp-actors-helpers
cpp-actors-interconnect
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-engine-minikql
@@ -39,7 +40,6 @@ target_sources(core-tx-coordinator PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp
diff --git a/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt b/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt
index 6e0e3565fd..5dd15ca26c 100644
--- a/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/coordinator/CMakeLists.linux-aarch64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-tx-coordinator PUBLIC
cpp-actors-core
cpp-actors-helpers
cpp-actors-interconnect
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-engine-minikql
@@ -40,7 +41,6 @@ target_sources(core-tx-coordinator PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp
diff --git a/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt
index 6e0e3565fd..5dd15ca26c 100644
--- a/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/coordinator/CMakeLists.linux-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-tx-coordinator PUBLIC
cpp-actors-core
cpp-actors-helpers
cpp-actors-interconnect
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-engine-minikql
@@ -40,7 +41,6 @@ target_sources(core-tx-coordinator PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp
diff --git a/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt b/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt
index 6b5e5d1b2b..8b3bb6b698 100644
--- a/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/coordinator/CMakeLists.windows-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(core-tx-coordinator PUBLIC
cpp-actors-core
cpp-actors-helpers
cpp-actors-interconnect
+ cpp-containers-absl_flat_hash
ydb-core-actorlib_impl
ydb-core-base
core-engine-minikql
@@ -39,7 +40,6 @@ target_sources(core-tx-coordinator PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__plan_step.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__read_step_subscriptions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_params.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/coordinator/coordinator__schema_upgrade.cpp
diff --git a/ydb/core/tx/coordinator/coordinator.cpp b/ydb/core/tx/coordinator/coordinator.cpp
index 3e76fe192e..bdcabefbb3 100644
--- a/ydb/core/tx/coordinator/coordinator.cpp
+++ b/ydb/core/tx/coordinator/coordinator.cpp
@@ -18,10 +18,6 @@ NKikimr::TEvTxCoordinator::TEvCoordinatorStep::TEvCoordinatorStep(const NFlatTxC
}
}
-NKikimr::TEvTxCoordinator::TEvMediatorQueueConfirmations::TEvMediatorQueueConfirmations(TAutoPtr<NKikimr::NFlatTxCoordinator::TMediatorConfirmations> &confirmations)
-: Confirmations(confirmations)
-{}
-
NKikimr::TEvTxCoordinator::TEvCoordinatorStepResult::TEvCoordinatorStepResult(NKikimrTx::TEvCoordinatorStepResult::EStatus status, ui64 step, ui64 completeStep, ui64 latestKnown, ui64 subjectiveTime, ui64 mediator, ui64 coordinator)
{
Record.SetStatus(status);
@@ -55,22 +51,3 @@ NKikimr::TEvTxCoordinator::TEvCoordinatorSyncResult::TEvCoordinatorSyncResult(ui
Record.SetMediatorID(mediator);
Record.SetCoordinatorID(coordinator);
}
-
-NKikimr::TEvTxCoordinator::TEvMediatorQueueStep::TEvMediatorQueueStep(ui64 genCookie, TAutoPtr<NKikimr::NFlatTxCoordinator::TMediatorStep> step)
-: GenCookie(genCookie)
-, Step(step)
-{}
-
-NKikimr::TEvTxCoordinator::TEvMediatorQueueRestart::TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie)
-: MediatorId(mediatorId)
-, StartFrom(startFrom)
-, GenCookie(genCookie)
-{}
-
-NKikimr::TEvTxCoordinator::TEvMediatorQueueStop::TEvMediatorQueueStop(ui64 mediatorId)
-: MediatorId(mediatorId)
-{}
-
-NKikimr::TEvTxCoordinator::TEvCoordinatorConfirmPlan::TEvCoordinatorConfirmPlan(TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> &confirmations)
-: Confirmations(confirmations)
-{}
diff --git a/ydb/core/tx/coordinator/coordinator.h b/ydb/core/tx/coordinator/coordinator.h
index 771fed39ca..a7b03f5cac 100644
--- a/ydb/core/tx/coordinator/coordinator.h
+++ b/ydb/core/tx/coordinator/coordinator.h
@@ -1,7 +1,6 @@
#pragma once
#include "defs.h"
#include <ydb/core/tx/tx.h>
-#include <ydb/core/util/queue_oneone_inplace.h>
#include <util/generic/bitmap.h>
#include <util/generic/set.h>
#include <util/generic/hash.h>
@@ -9,15 +8,11 @@
#include <util/generic/hash_set.h>
namespace NKikimr {
+
namespace NFlatTxCoordinator {
struct TMediatorStep;
- struct TMediatorConfirmations;
- struct TCoordinatorStepConfirmations;
-}
}
-namespace NKikimr {
-
IActor* CreateFlatTxCoordinator(const TActorId &tablet, TTabletStorageInfo *info);
struct TEvTxCoordinator {
@@ -33,7 +28,7 @@ struct TEvTxCoordinator {
EvMediatorQueueStop,
EvMediatorQueueConfirmations,
- EvCoordinatorConfirmPlan = EvCoordinatorStep + 3 * 512,
+ // deprecated: EvCoordinatorConfirmPlan = EvCoordinatorStep + 3 * 512,
EvEnd
};
@@ -73,40 +68,6 @@ struct TEvTxCoordinator {
};
- // must be explicit queue?
- struct TEvMediatorQueueStep : public TEventLocal<TEvMediatorQueueStep, EvMediatorQueueStep> {
- const ui64 GenCookie;
- TAutoPtr<NFlatTxCoordinator::TMediatorStep> Step;
-
- TEvMediatorQueueStep(ui64 genCookie, TAutoPtr<NFlatTxCoordinator::TMediatorStep> step);
- };
-
- struct TEvMediatorQueueRestart : public TEventLocal<TEvMediatorQueueRestart, EvMediatorQueueRestart> {
- const ui64 MediatorId;
- const ui64 StartFrom;
- const ui64 GenCookie;
-
- TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie);
- };
-
- struct TEvMediatorQueueStop : public TEventLocal<TEvMediatorQueueStop, EvMediatorQueueStop> {
- const ui64 MediatorId;
-
- TEvMediatorQueueStop(ui64 mediatorId);
- };
-
- struct TEvMediatorQueueConfirmations : public TEventLocal<TEvMediatorQueueConfirmations, EvMediatorQueueConfirmations> {
- TAutoPtr<NFlatTxCoordinator::TMediatorConfirmations> Confirmations;
-
- TEvMediatorQueueConfirmations(TAutoPtr<NFlatTxCoordinator::TMediatorConfirmations> &confirmations);
- };
-
- struct TEvCoordinatorConfirmPlan : public TEventLocal<TEvCoordinatorConfirmPlan, EvCoordinatorConfirmPlan> {
- TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> Confirmations;
-
- TEvCoordinatorConfirmPlan(TAutoPtr<NFlatTxCoordinator::TCoordinatorStepConfirmations> &confirmations);
- };
-
};
}
diff --git a/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp b/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp
index 28a644503d..3f15ce76a7 100644
--- a/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp
+++ b/ydb/core/tx/coordinator/coordinator__mediators_confirmations.cpp
@@ -8,12 +8,12 @@ namespace NKikimr {
namespace NFlatTxCoordinator {
struct TTxCoordinator::TTxMediatorConfirmations : public TTransactionBase<TTxCoordinator> {
- TAutoPtr<TMediatorConfirmations> Confirmations;
+ std::unique_ptr<TMediatorConfirmations> Confirmations;
i64 CompleteTransactions;
- TTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations, TSelf *coordinator)
+ TTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations, TSelf *coordinator)
: TBase(coordinator)
- , Confirmations(confirmations)
+ , Confirmations(std::move(confirmations))
, CompleteTransactions(0)
{}
@@ -82,8 +82,8 @@ struct TTxCoordinator::TTxMediatorConfirmations : public TTransactionBase<TTxCoo
}
};
-ITransaction* TTxCoordinator::CreateTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations) {
- return new TTxMediatorConfirmations(confirmations, this);
+ITransaction* TTxCoordinator::CreateTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations) {
+ return new TTxMediatorConfirmations(std::move(confirmations), this);
}
}
diff --git a/ydb/core/tx/coordinator/coordinator__plan_step.cpp b/ydb/core/tx/coordinator/coordinator__plan_step.cpp
index b328d015c4..82db717845 100644
--- a/ydb/core/tx/coordinator/coordinator__plan_step.cpp
+++ b/ydb/core/tx/coordinator/coordinator__plan_step.cpp
@@ -19,24 +19,24 @@ struct TInFlyAccountant {
struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
const ui64 PlanOnStep;
- TVector<TQueueType::TSlot> Slots;
+ std::deque<TQueueType::TSlot> Slots;
- TMap<ui64, std::pair<ui64, bool *>> StepsToConfirm;
- TAutoPtr<TCoordinatorStepConfirmations> ProxyPlanConfirmations;
+ TMap<ui64, TMediatorStepList::iterator> StepsToConfirm;
+ TCoordinatorStepConfirmations ProxyPlanConfirmations;
TInstant ExecStartMoment;
ui64 PlannedCounter;
ui64 DeclinedCounter;
TInFlyAccountant InFlyAccountant;
- TTxPlanStep(ui64 toPlan, TVector<TQueueType::TSlot> &slots, TSelf *coordinator)
+ TTxPlanStep(ui64 toPlan, std::deque<TQueueType::TSlot> &&slots, TSelf *coordinator)
: TBase(coordinator)
, PlanOnStep(toPlan)
+ , Slots(std::move(slots))
, PlannedCounter(0)
, DeclinedCounter(0)
, InFlyAccountant(Self->MonCounters.StepsInFly)
{
- Slots.swap(slots);
}
void Plan(TTransactionContext &txc, const TActorContext &ctx) {
@@ -46,40 +46,37 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
const bool lowDiskSpace = Self->Executor()->GetStats().IsAnyChannelYellowStop;
THashSet<TTxId> newTransactions;
- TVector<TAutoPtr<TMediatorStep>> mediatorSteps;
+ TVector<TMediatorStep> mediatorSteps;
THashMap<TTabletId, TVector<TTabletId>> byMediatorAffected;
// first fill every mediator with something (every mediator must receive step)
const ui32 mediatorsSize = Self->Config.Mediators->List().size();
mediatorSteps.reserve(mediatorsSize);
for (TTabletId mediatorId : Self->Config.Mediators->List()) {
- mediatorSteps.push_back(new TMediatorStep(mediatorId, PlanOnStep));
+ mediatorSteps.emplace_back(mediatorId, PlanOnStep);
}
// create mediator steps
- ProxyPlanConfirmations.Reset(new TCoordinatorStepConfirmations(PlanOnStep));
- for (const auto &slot : Slots) {
- TQueueType::TQ &queue = *slot.Queue;
- TQueueType::TQ::TReadIterator iterator = queue.Iterator();
- while (TTransactionProposal *proposal = iterator.Next()) {
+ for (auto &slot : Slots) {
+ for (auto &proposal : slot) {
for (auto &x : byMediatorAffected) {
x.second.clear();
}
- const TTxId txId = proposal->TxId;
+ const TTxId txId = proposal.TxId;
Y_VERIFY(txId);
Self->MonCounters.StepConsideredTx->Inc();
- auto durationMs = (ExecStartMoment - proposal->AcceptMoment).MilliSeconds();
+ auto durationMs = (ExecStartMoment - proposal.AcceptMoment).MilliSeconds();
Self->MonCounters.TxFromReceiveToPlan->Collect(durationMs);
- if (proposal->MaxStep < PlanOnStep) {
+ if (proposal.MaxStep < PlanOnStep) {
Self->MonCounters.StepOutdatedTx->Inc();
- ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry {
+ ProxyPlanConfirmations.Queue.emplace_back(
txId,
- proposal->Proxy,
+ proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated,
- 0 });
+ 0);
++DeclinedCounter;
continue;
}
@@ -87,11 +84,11 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
// check is transactions already processed?
if (newTransactions.insert(txId).second == false) {
Self->MonCounters.StepPlannedDeclinedTx->Inc();
- ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry {
+ ProxyPlanConfirmations.Queue.emplace_back(
txId,
- proposal->Proxy,
+ proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned,
- PlanOnStep });
+ PlanOnStep);
++DeclinedCounter;
continue;
}
@@ -100,24 +97,23 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
auto it = Self->Transactions.find(txId);
if (it != Self->Transactions.end()) {
Self->MonCounters.StepPlannedDeclinedTx->Inc();
- ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry {
+ ProxyPlanConfirmations.Queue.emplace_back(
txId,
- proposal->Proxy,
+ proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned,
- it->second.PlanOnStep });
+ it->second.PlanOnStep);
++DeclinedCounter;
continue;
}
}
- if (lowDiskSpace && !proposal->IgnoreLowDiskSpace) {
+ if (lowDiskSpace && !proposal.IgnoreLowDiskSpace) {
Self->MonCounters.StepDeclinedNoSpaceTx->Inc();
- ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry{
+ ProxyPlanConfirmations.Queue.emplace_back(
txId,
- proposal->Proxy,
+ proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclinedNoSpace,
- 0
- });
+ 0);
++DeclinedCounter;
continue;
}
@@ -128,8 +124,8 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
TTransaction& transaction = Self->Transactions[txId];
transaction.PlanOnStep = PlanOnStep;
- Y_VERIFY(!proposal->AffectedSet.empty());
- for (const auto &txprop : proposal->AffectedSet) {
+ Y_VERIFY(!proposal.AffectedSet.empty());
+ for (const auto &txprop : proposal.AffectedSet) {
const TTabletId affectedTablet = txprop.TabletId;
const TTabletId mediatorId = Self->Config.Mediators->Select(affectedTablet);
@@ -149,10 +145,10 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
}
for (ui32 idx = 0; idx < mediatorsSize; ++idx) {
- TTabletId mediatorId = mediatorSteps[idx]->MediatorId;
+ TTabletId mediatorId = mediatorSteps[idx].MediatorId;
TVector<TTabletId> &affected = byMediatorAffected[mediatorId];
if (!affected.empty()) {
- mediatorSteps[idx]->Transactions.push_back(TMediatorStep::TTx(txId, &affected.front(), affected.size(), 0));
+ mediatorSteps[idx].Transactions.emplace_back(txId, affected.data(), affected.size(), 0);
}
}
@@ -160,19 +156,19 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
++PlannedCounter;
Self->MonCounters.StepPlannedTx->Inc();
- ProxyPlanConfirmations->Queue->Push(new TCoordinatorStepConfirmations::TEntry {
+ ProxyPlanConfirmations.Queue.emplace_back(
txId,
- proposal->Proxy,
+ proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned,
- PlanOnStep } );
+ PlanOnStep);
}
}
- for (const TAutoPtr<TMediatorStep> &mp : mediatorSteps) {
- const ui64 mediatorId = mp->MediatorId;
+ for (TMediatorStep& m : mediatorSteps) {
+ const ui64 mediatorId = m.MediatorId;
// write mediator entry
- for (const auto &tx : mp->Transactions) {
+ for (const auto &tx : m.Transactions) {
for (TTabletId tablet : tx.PushToAffected) {
db.Table<Schema::AffectedSet>().Key(mediatorId, tx.TxId, tablet).Update();
FLOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "Planned transaction " << tx.TxId << " for mediator " << mediatorId << " tablet " << tablet);
@@ -180,12 +176,12 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
}
TMediator& mediator = Self->Mediator(mediatorId, ctx);
- if (mediator.PushUpdates) {
- StepsToConfirm[mediatorId] = std::pair<ui64, bool *>(mediator.GenCookie, &mp->Confirmed);
- mediator.Queue->Push(mp.Release());
- } else if (!StepsToConfirm.empty()) {
- FLOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "PushUpdates false for mediator " << mediatorId << " step " << PlanOnStep);
+ if (!mediator.Queue.empty() && mediator.Queue.back().Confirmed && mediator.Queue.back().Transactions.empty()) {
+ // Remove the last confirmed empty step
}
+ mediator.Queue.emplace_back(std::move(m));
+ auto it = --mediator.Queue.end();
+ StepsToConfirm[mediatorId] = it;
}
db.Table<Schema::State>().Key(Schema::State::KeyLastPlanned).Update(NIceDb::TUpdate<Schema::State::StateValue>(PlanOnStep));
}
@@ -210,24 +206,31 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
auto durationMs = (ctx.Now() - ExecStartMoment).MilliSeconds();
Self->MonCounters.TxPlanLatency->Collect(durationMs);
- for (auto &cx : StepsToConfirm) {
- const ui64 mediatorId = cx.first;
+ for (auto &pr : StepsToConfirm) {
+ const ui64 mediatorId = pr.first;
TMediator &mediator = Self->Mediator(mediatorId, ctx);
- if (mediator.GenCookie == cx.second.first) {
- *cx.second.second = true;
- Self->SendMediatorStep(mediator, ctx);
+ Y_VERIFY(!mediator.Queue.empty());
+ pr.second->Confirmed = true;
+ for (auto it = pr.second; it != mediator.Queue.begin();) {
+ --it;
+ if (!it->Confirmed) break;
+ if (!it->Transactions.empty()) break;
+ // Remove empty confirmed steps before us
+ // Needed so the queue does not grow for disconnected mediators
+ mediator.Queue.erase(it++);
}
+ Self->SendMediatorStep(mediator, ctx);
}
- ctx.Send(ctx.SelfID, new TEvTxCoordinator::TEvCoordinatorConfirmPlan(ProxyPlanConfirmations));
+ Self->SendStepConfirmations(ProxyPlanConfirmations, ctx);
// uncomment this to enable consistency self-check
//Self->Execute(Self->CreateTxConsistencyCheck(), ctx);
}
};
-ITransaction* TTxCoordinator::CreateTxPlanStep(ui64 toStep, TVector<TQueueType::TSlot> &slots) {
- return new TTxPlanStep(toStep, slots, this);
+ITransaction* TTxCoordinator::CreateTxPlanStep(ui64 toStep, std::deque<TQueueType::TSlot> &&slots) {
+ return new TTxPlanStep(toStep, std::move(slots), this);
}
}
diff --git a/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp b/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
deleted file mode 100644
index 673115d22b..0000000000
--- a/ydb/core/tx/coordinator/coordinator__restart_mediator.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-#include "coordinator_impl.h"
-
-namespace NKikimr {
-namespace NFlatTxCoordinator {
-
-struct TTxCoordinator::TTxRestartMediatorQueue : public TTransactionBase<TTxCoordinator> {
- const TTabletId MediatorId;
- const ui64 GenCookie;
-
- TVector<bool *> StepsToConfirm;
-
- TTxRestartMediatorQueue(ui64 mediatorId, ui64 genCookie, TSelf *coordinator)
- : TBase(coordinator)
- , MediatorId(mediatorId)
- , GenCookie(genCookie)
- {}
-
- TTxType GetTxType() const override { return TXTYPE_RESTART_MEDIATOR; }
-
- bool Execute(TTransactionContext &txc, const TActorContext &ctx) override {
- TMediator &mediator = Self->Mediator(MediatorId, ctx);
- if (mediator.GenCookie != GenCookie)
- return true;
-
- THashMap<TTxId,TVector<TTabletId>> pushToAffectedBuffer;
- TVector<TAutoPtr<TMediatorStep>> mediatorSteps;
-
- if (!Self->RestoreMediatorInfo(MediatorId, mediatorSteps, txc, pushToAffectedBuffer))
- return false;
-
- for (const auto& it : pushToAffectedBuffer) {
- TTransaction& transaction = Self->Transactions[it.first];
- THashSet<TTabletId>& unconfirmedAffectedSet = transaction.UnconfirmedAffectedSet[MediatorId];
- Y_VERIFY(unconfirmedAffectedSet.size() == it.second.size(),
- "Incosistent affected set in mem in DB for txId %" PRIu64, it.first);
- for (const TTabletId affectedTabletId : it.second) {
- Y_VERIFY(unconfirmedAffectedSet.contains(affectedTabletId),
- "Incosistent affected set in mem in DB for txId %" PRIu64 " missing tabletId %" PRIu64,
- it.first, affectedTabletId);
- }
- }
-
- for (const auto &mp : mediatorSteps) {
- StepsToConfirm.push_back(&mp->Confirmed);
- mediator.Queue->Push(mp.Release());
- }
-
- mediator.PushUpdates = true;
- return true;
- }
-
- void Complete(const TActorContext &ctx) override {
- TMediator &mediator = Self->Mediator(MediatorId, ctx);
- if (mediator.GenCookie != GenCookie)
- return;
-
- for (bool *x : StepsToConfirm)
- *x = true;
-
- Self->SendMediatorStep(mediator, ctx);
- }
-};
-
-ITransaction* TTxCoordinator::CreateTxRestartMediatorQueue(TTabletId mediatorId, ui64 genCookie) {
- return new TTxRestartMediatorQueue(mediatorId, genCookie, this);
-}
-
-bool TTxCoordinator::RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const {
- NIceDb::TNiceDb db(txc.DB);
- pushToAffected.clear();
- planned.clear();
-
- auto rowset = db.Table<Schema::AffectedSet>().Range(mediatorId).Select();
- if (!rowset.IsReady())
- return false;
-
- // Later we will need this to be sorted by stepId
- TMap<TStepId, TAutoPtr<TMediatorStep>> mediatorSteps;
-
- while (!rowset.EndOfSet()) {
- const TTxId txId = rowset.GetValue<Schema::AffectedSet::TransactionID>();
- auto itTransaction = Transactions.find(txId);
- Y_VERIFY(itTransaction != Transactions.end());
-
- TStepId step = itTransaction->second.PlanOnStep;
- auto itStep = mediatorSteps.find(step);
- if (itStep == mediatorSteps.end()) {
- itStep = mediatorSteps.insert(std::make_pair(step, new TMediatorStep(mediatorId, step))).first;
- }
- TAutoPtr<TMediatorStep>& mediatorStep = itStep->second;
-
- if (mediatorStep->Transactions.empty() || mediatorStep->Transactions.back().TxId != txId)
- mediatorStep->Transactions.push_back(TMediatorStep::TTx(txId));
- TMediatorStep::TTx &tx = mediatorStep->Transactions.back();
-
- TTabletId tablet = rowset.GetValue<Schema::AffectedSet::DataShardID>();
- pushToAffected[txId].push_back(tablet);
- tx.PushToAffected.push_back(tablet);
- tx.Moderator = 0;
- if (!rowset.Next())
- return false;
- }
-
- for (auto& pr : mediatorSteps)
- planned.push_back(pr.second);
-
- return true;
-}
-
-}
-}
diff --git a/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp b/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
index 53117e2a57..97d428d77e 100644
--- a/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
+++ b/ydb/core/tx/coordinator/coordinator__restore_transaction.cpp
@@ -13,6 +13,52 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
: TBase(coordinator)
{}
+ struct TMediatorState {
+ TMediatorStepList Steps;
+ THashMap<TStepId, TMediatorStepList::iterator> Index;
+ };
+
+ THashMap<TTabletId, TMediatorState> Mediators;
+
+ TMediatorStep& GetMediatorStep(TTabletId mediatorId, TStepId step) {
+ auto& state = Mediators[mediatorId];
+
+ auto it = state.Index.find(step);
+ if (it != state.Index.end()) {
+ return *it->second;
+ }
+
+ auto& entry = state.Steps.emplace_back(mediatorId, step);
+ state.Index[step] = --state.Steps.end();
+ return entry;
+ }
+
+ TMediatorStep::TTx& GetMediatorTx(TTabletId mediatorId, TStepId step, TTxId txId) {
+ auto& medStep = GetMediatorStep(mediatorId, step);
+ if (medStep.Transactions.empty() || medStep.Transactions.back().TxId < txId) {
+ return medStep.Transactions.emplace_back(txId);
+ }
+ auto& medTx = medStep.Transactions.back();
+ Y_VERIFY_S(medTx.TxId == txId, "Transaction loading must be ordered by TxId:"
+ << " Mediator# " << mediatorId
+ << " step# " << step
+ << " TxId# " << txId
+ << " PrevTxId# " << medTx.TxId);
+ return medTx;
+ }
+
+ void EnsureLastMediatorStep(TTabletId mediatorId, TStepId step) {
+ auto& state = Mediators[mediatorId];
+ if (!state.Steps.empty()) {
+ auto it = --state.Steps.end();
+ if (step <= it->Step) {
+ return; // nothing to do
+ }
+ }
+ state.Steps.emplace_back(mediatorId, step);
+ state.Index[step] = --state.Steps.end();
+ }
+
bool Restore(TTransactions &transactions, TTransactionContext &txc, const TActorContext &ctx) {
Y_UNUSED(ctx);
NIceDb::TNiceDb db(txc.DB);
@@ -35,6 +81,8 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
}
}
+ Mediators.clear();
+
{
int errors = 0;
auto rowset = db.Table<Schema::AffectedSet>().Range().Select();
@@ -48,7 +96,10 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
auto itTransaction = transactions.find(txId);
if (itTransaction != transactions.end()) {
- itTransaction->second.UnconfirmedAffectedSet[medId].insert(affectedShardId);
+ auto& tx = itTransaction->second;
+ tx.UnconfirmedAffectedSet[medId].insert(affectedShardId);
+ auto& medTx = GetMediatorTx(medId, tx.PlanOnStep, txId);
+ medTx.PushToAffected.push_back(affectedShardId);
} else {
LOG_ERROR_S(ctx, NKikimrServices::TX_COORDINATOR, "Transaction not found: MedId = " << medId << " TxId = " << txId << " DataShardId = " << affectedShardId);
++errors;
@@ -85,14 +136,36 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
Self->Transactions.swap(transactions);
*Self->MonCounters.TxInFly += txCounter;
Self->MonCounters.CurrentTxInFly = txCounter;
+
+ // Prepare mediator queues
+ for (ui64 mediatorId : Self->Config.Mediators->List()) {
+ auto& state = Mediators[mediatorId];
+ // We need to slice steps in a sorted order
+ state.Steps.sort([](const TMediatorStep& a, const TMediatorStep& b) -> bool {
+ return a.Step < b.Step;
+ });
+ // Make sure each mediator will receive last planned step
+ if (Self->VolatileState.LastPlanned != 0) {
+ EnsureLastMediatorStep(mediatorId, Self->VolatileState.LastPlanned);
+ }
+ // Splice all steps to the queue
+ TMediator& mediator = Self->Mediator(mediatorId, ctx);
+ Y_VERIFY(mediator.Queue.empty());
+ mediator.Queue.splice(mediator.Queue.end(), state.Steps);
+ }
return true;
}
void Complete(const TActorContext &ctx) override {
- // start mediator queues
+ // Send steps to connected queues
for (ui64 mediatorId: Self->Config.Mediators->List()) {
TMediator &mediator = Self->Mediator(mediatorId, ctx);
- Y_UNUSED(mediator);
+ auto& state = Mediators[mediatorId];
+ for (auto& pr : state.Index) {
+ Y_VERIFY(!pr.second->Confirmed);
+ pr.second->Confirmed = true;
+ }
+ Self->SendMediatorStep(mediator, ctx);
}
// start plan process.
diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp
index 1edc81a258..0af758a563 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.cpp
+++ b/ydb/core/tx/coordinator/coordinator_impl.cpp
@@ -26,7 +26,7 @@ static void SendTransactionStatus(const TActorId &proxy, TEvTxProxy::TEvProposeT
ctx.Send(proxy, new TEvTxProxy::TEvProposeTransactionStatus(status, txid, stepId));
}
-static TAutoPtr<TTransactionProposal> MakeTransactionProposal(TEvTxProxy::TEvProposeTransaction::TPtr &ev, ::NMonitoring::TDynamicCounters::TCounterPtr &counter) {
+static TTransactionProposal MakeTransactionProposal(TEvTxProxy::TEvProposeTransaction::TPtr &ev, ::NMonitoring::TDynamicCounters::TCounterPtr &counter) {
const TActorId &sender = ev->Sender;
const NKikimrTx::TEvProposeTransaction &record = ev->Get()->Record;
@@ -36,11 +36,11 @@ static TAutoPtr<TTransactionProposal> MakeTransactionProposal(TEvTxProxy::TEvPro
const ui64 maxStep = txrec.HasMaxStep() ? txrec.GetMaxStep() : Max<ui64>();
const bool ignoreLowDiskSpace = txrec.GetIgnoreLowDiskSpace();
- TAutoPtr<TTransactionProposal> proposal(new TTransactionProposal(sender, txId, minStep, maxStep, ignoreLowDiskSpace));
- proposal->AffectedSet.resize(txrec.AffectedSetSize());
+ TTransactionProposal proposal(sender, txId, minStep, maxStep, ignoreLowDiskSpace);
+ proposal.AffectedSet.resize(txrec.AffectedSetSize());
for (ui32 i = 0, e = txrec.AffectedSetSize(); i != e; ++i) {
const auto &x = txrec.GetAffectedSet(i);
- auto &s = proposal->AffectedSet[i];
+ auto &s = proposal.AffectedSet[i];
s.TabletId = x.GetTabletId();
Y_ASSERT(x.GetFlags() > 0 && x.GetFlags() <= 3);
@@ -77,32 +77,32 @@ TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet)
TabletCounters = TabletCountersPtr.Get();
}
-void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TActorContext &ctx) {
- proposal->AcceptMoment = ctx.Now();
+void TTxCoordinator::PlanTx(TTransactionProposal &&proposal, const TActorContext &ctx) {
+ proposal.AcceptMoment = ctx.Now();
MonCounters.PlanTxCalls->Inc();
- if (proposal->MaxStep <= VolatileState.LastPlanned) {
+ if (proposal.MaxStep <= VolatileState.LastPlanned) {
MonCounters.PlanTxOutdated->Inc();
- return SendTransactionStatus(proposal->Proxy
+ return SendTransactionStatus(proposal.Proxy
, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated
- , proposal->TxId, 0, ctx, TabletID());
+ , proposal.TxId, 0, ctx, TabletID());
}
if (Stopping) {
- return SendTransactionStatus(proposal->Proxy,
+ return SendTransactionStatus(proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting,
- proposal->TxId, 0, ctx, TabletID());
+ proposal.TxId, 0, ctx, TabletID());
}
- bool forRapidExecution = (proposal->MinStep <= VolatileState.LastPlanned);
+ bool forRapidExecution = (proposal.MinStep <= VolatileState.LastPlanned);
ui64 planStep = 0;
// cycle for flow control
- for (ui64 cstep = (proposal->MinStep + Config.Resolution - 1) / Config.Resolution * Config.Resolution; /*no-op*/;cstep += Config.Resolution) {
- if (cstep >= proposal->MaxStep) {
+ for (ui64 cstep = (proposal.MinStep + Config.Resolution - 1) / Config.Resolution * Config.Resolution; /*no-op*/;cstep += Config.Resolution) {
+ if (cstep >= proposal.MaxStep) {
MonCounters.PlanTxOutdated->Inc();
- return SendTransactionStatus(proposal->Proxy,
- TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, proposal->TxId, 0, ctx, TabletID());
+ return SendTransactionStatus(proposal.Proxy,
+ TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, proposal.TxId, 0, ctx, TabletID());
}
if (forRapidExecution) {
@@ -117,15 +117,14 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct
}
MonCounters.PlanTxAccepted->Inc();
- SendTransactionStatus(proposal->Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted,
- proposal->TxId, planStep, ctx, TabletID());
+ SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted,
+ proposal.TxId, planStep, ctx, TabletID());
if (forRapidExecution) {
TQueueType::TSlot &rapidSlot = VolatileState.Queue.RapidSlot;
- rapidSlot.Queue->Push(proposal.Release());
- ++rapidSlot.QueueSize;
+ rapidSlot.push_back(std::move(proposal));
- if (rapidSlot.QueueSize >= Config.RapidSlotFlushSize) {
+ if (rapidSlot.size() >= Config.RapidSlotFlushSize) {
SchedulePlanTickExact(planStep);
}
@@ -133,8 +132,7 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct
SchedulePlanTickAligned(planStep);
} else {
TQueueType::TSlot &planSlot = VolatileState.Queue.LowSlot(planStep);
- planSlot.Queue->Push(proposal.Release());
- ++planSlot.QueueSize;
+ planSlot.push_back(std::move(proposal));
// We may be sleeping at reduced resolution, try to wake up sooner
SchedulePlanTickExact(planStep);
@@ -142,29 +140,29 @@ void TTxCoordinator::PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TAct
}
void TTxCoordinator::Handle(TEvTxProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
- TAutoPtr<TTransactionProposal> proposal = MakeTransactionProposal(ev, MonCounters.TxIn);
- LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal->TxId
+ TTransactionProposal proposal = MakeTransactionProposal(ev, MonCounters.TxIn);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal.TxId
<< " HANDLE EvProposeTransaction marker# C0");
- PlanTx(proposal, ctx);
+ PlanTx(std::move(proposal), ctx);
}
void TTxCoordinator::HandleEnqueue(TEvTxProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
TryInitMonCounters(ctx);
- TAutoPtr<TTransactionProposal> proposal = MakeTransactionProposal(ev, MonCounters.TxIn);
- LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal->TxId
+ TTransactionProposal proposal = MakeTransactionProposal(ev, MonCounters.TxIn);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << proposal.TxId
<< " HANDLE Enqueue EvProposeTransaction");
if (Y_UNLIKELY(Stopping)) {
- return SendTransactionStatus(proposal->Proxy,
+ return SendTransactionStatus(proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting,
- proposal->TxId, 0, ctx, TabletID());
+ proposal.TxId, 0, ctx, TabletID());
}
if (!VolatileState.Queue.Unsorted)
- VolatileState.Queue.Unsorted.Reset(new TQueueType::TQ());
+ VolatileState.Queue.Unsorted.emplace();
- VolatileState.Queue.Unsorted->Push(proposal.Release());
+ VolatileState.Queue.Unsorted->push_back(std::move(proposal));
}
bool TTxCoordinator::AllowReducedPlanResolution() const {
@@ -281,9 +279,12 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte
}
if (VolatileState.Queue.Unsorted) {
- while (TAutoPtr<TTransactionProposal> x = VolatileState.Queue.Unsorted->Pop())
- PlanTx(x, ctx);
- VolatileState.Queue.Unsorted.Destroy();
+ while (!VolatileState.Queue.Unsorted->empty()) {
+ auto& proposal = VolatileState.Queue.Unsorted->front();
+ PlanTx(std::move(proposal), ctx);
+ VolatileState.Queue.Unsorted->pop_front();
+ }
+ VolatileState.Queue.Unsorted.reset();
}
const ui64 resolution = Config.Resolution;
@@ -307,14 +308,11 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte
return SchedulePlanTick();
}
- TVector<TQueueType::TSlot> slots;
+ std::deque<TQueueType::TSlot> slots;
- if (VolatileState.Queue.RapidSlot.QueueSize) {
- if (slots.empty()) {
- slots.reserve(1000);
- }
- slots.push_back(VolatileState.Queue.RapidSlot);
- VolatileState.Queue.RapidSlot = TQueueType::TSlot();
+ if (!VolatileState.Queue.RapidSlot.empty()) {
+ slots.push_back(std::move(VolatileState.Queue.RapidSlot));
+ VolatileState.Queue.RapidSlot.clear();
}
while (!VolatileState.Queue.Low.empty()) {
@@ -322,11 +320,8 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte
if (frontIt->first > next)
break;
- if (frontIt->second.QueueSize) {
- if (slots.empty()) {
- slots.reserve(1000);
- }
- slots.push_back(frontIt->second);
+ if (!frontIt->second.empty()) {
+ slots.push_back(std::move(frontIt->second));
}
VolatileState.Queue.Low.erase(frontIt);
@@ -338,46 +333,43 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte
VolatileState.LastPlanned = next;
NotifyUpdatedLastStep();
- Execute(CreateTxPlanStep(next, slots), ctx);
+ Execute(CreateTxPlanStep(next, std::move(slots)), ctx);
SchedulePlanTick();
}
-void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx) {
- TEvTxCoordinator::TEvMediatorQueueConfirmations *msg = ev->Get();
+void TTxCoordinator::Handle(TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx) {
+ TEvMediatorQueueConfirmations *msg = ev->Get();
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID()
<< " HANDLE EvMediatorQueueConfirmations MediatorId# " << msg->Confirmations->MediatorId);
- Execute(CreateTxMediatorConfirmations(msg->Confirmations), ctx);
+ Execute(CreateTxMediatorConfirmations(std::move(msg->Confirmations)), ctx);
}
-void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx) {
- const TEvTxCoordinator::TEvMediatorQueueStop *msg = ev->Get();
+void TTxCoordinator::Handle(TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx) {
+ const TEvMediatorQueueStop *msg = ev->Get();
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID()
<< " HANDLE EvMediatorQueueStop MediatorId# " << msg->MediatorId);
TMediator &mediator = Mediator(msg->MediatorId, ctx);
- mediator.PushUpdates = false;
- mediator.GenCookie = Max<ui64>();
- mediator.Queue.Destroy();
+ mediator.Active = false;
}
-void TTxCoordinator::Handle(TEvTxCoordinator::TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx) {
- const TEvTxCoordinator::TEvMediatorQueueRestart *msg = ev->Get();
+void TTxCoordinator::Handle(TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx) {
+ const TEvMediatorQueueRestart *msg = ev->Get();
LOG_NOTICE_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID()
<< " HANDLE EvMediatorQueueRestart MediatorId# " << msg->MediatorId);
TMediator &mediator = Mediator(msg->MediatorId, ctx);
- mediator.PushUpdates = false;
- mediator.GenCookie = msg->GenCookie;
- mediator.Queue.Reset(new TMediator::TStepQueue());
- Execute(CreateTxRestartMediatorQueue(msg->MediatorId, msg->GenCookie), ctx);
+ mediator.Active = true;
+ SendMediatorStep(mediator, ctx);
}
-void TTxCoordinator::Handle(TEvTxCoordinator::TEvCoordinatorConfirmPlan::TPtr &ev, const TActorContext &ctx) {
- TAutoPtr<TCoordinatorStepConfirmations> confirmations = ev->Get()->Confirmations;
- while (TAutoPtr<TCoordinatorStepConfirmations::TEntry> x = confirmations->Queue->Pop()) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << x->TxId
- << " stepId# " << x->Step << " Status# " << x->Status
- << " SEND EvProposeTransactionStatus to# " << x->ProxyId.ToString() << " Proxy");
- ctx.Send(x->ProxyId, new TEvTxProxy::TEvProposeTransactionStatus(x->Status, x->TxId, x->Step));
+void TTxCoordinator::SendStepConfirmations(TCoordinatorStepConfirmations &confirmations, const TActorContext &ctx) {
+ while (!confirmations.Queue.empty()) {
+ auto &x = confirmations.Queue.front();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " txid# " << x.TxId
+ << " stepId# " << x.Step << " Status# " << x.Status
+ << " SEND EvProposeTransactionStatus to# " << x.ProxyId.ToString() << " Proxy");
+ ctx.Send(x.ProxyId, new TEvTxProxy::TEvProposeTransactionStatus(x.Status, x.TxId, x.Step));
+ confirmations.Queue.pop_front();
}
}
@@ -503,24 +495,39 @@ void TTxCoordinator::TryInitMonCounters(const TActorContext &ctx) {
}
void TTxCoordinator::SendMediatorStep(TMediator &mediator, const TActorContext &ctx) {
- while (TMediatorStep *step = mediator.Queue->Head()) {
- if (!step->Confirmed)
- return;
+ if (!mediator.Active) {
+ // We don't want to update LastSentStep when mediators are not empty
+ return;
+ }
- TAutoPtr<TMediatorStep> extracted = mediator.Queue->Pop();
- for (const auto& tx: extracted->Transactions) {
+ std::unique_ptr<TEvMediatorQueueStep> msg;
+ while (!mediator.Queue.empty()) {
+ auto it = mediator.Queue.begin();
+ if (!it->Confirmed) {
+ break;
+ }
+
+ for (const auto& tx: it->Transactions) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "Send from# " << TabletID()
- << " to mediator# " << extracted->MediatorId << ", step# " << extracted->Step
+ << " to mediator# " << it->MediatorId << ", step# " << it->Step
<< ", txid# " << tx.TxId << " marker# C2");
}
- if (VolatileState.LastSentStep < extracted->Step) {
- VolatileState.LastSentStep = extracted->Step;
+ if (VolatileState.LastSentStep < it->Step) {
+ VolatileState.LastSentStep = it->Step;
if (ReadStepSubscriptionManager) {
ctx.Send(ReadStepSubscriptionManager, new TEvPrivate::TEvReadStepUpdated(VolatileState.LastSentStep));
}
}
- ctx.Send(mediator.QueueActor, new TEvTxCoordinator::TEvMediatorQueueStep(mediator.GenCookie, extracted));
+
+ if (!msg) {
+ msg.reset(new TEvMediatorQueueStep());
+ }
+ msg->SpliceStep(mediator.Queue, it);
+ }
+
+ if (msg) {
+ ctx.Send(mediator.QueueActor, msg.release());
}
}
@@ -565,22 +572,24 @@ void TTxCoordinator::OnTabletStop(TEvTablet::TEvTabletStop::TPtr &ev, const TAct
void TTxCoordinator::OnStopGuardStarting(const TActorContext &ctx) {
auto processQueue = [&](auto &queue) {
- while (TAutoPtr<TTransactionProposal> proposal = queue.Pop()) {
- SendTransactionStatus(proposal->Proxy,
+ while (!queue.empty()) {
+ auto& proposal = queue.front();
+ SendTransactionStatus(proposal.Proxy,
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting,
- proposal->TxId, 0, ctx, TabletID());
+ proposal.TxId, 0, ctx, TabletID());
+ queue.pop_front();
}
};
if (VolatileState.Queue.Unsorted) {
processQueue(*VolatileState.Queue.Unsorted);
- VolatileState.Queue.Unsorted.Destroy();
+ VolatileState.Queue.Unsorted.reset();
}
- processQueue(*VolatileState.Queue.RapidSlot.Queue);
+ processQueue(VolatileState.Queue.RapidSlot);
for (auto &kv : VolatileState.Queue.Low) {
- processQueue(*kv.second.Queue);
+ processQueue(kv.second);
}
VolatileState.Queue.Low.clear();
}
diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h
index cd8968308f..e43e0026dc 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.h
+++ b/ydb/core/tx/coordinator/coordinator_impl.h
@@ -15,7 +15,6 @@
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/scheme/scheme_types_defs.h>
#include <ydb/core/tx/tx.h>
-#include <ydb/core/util/queue_oneone_inplace.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -80,15 +79,7 @@ struct TCoordinatorStepConfirmations {
{}
};
- using TQ = TOneOneQueueInplace<TEntry *, 128>;
-
- const TAutoPtr<TQ, TQ::TPtrCleanDestructor> Queue;
- const TStepId Step;
-
- TCoordinatorStepConfirmations(TStepId step)
- : Queue(new TQ())
- , Step(step)
- {}
+ std::deque<TEntry> Queue;
};
struct TMediatorStep {
@@ -100,7 +91,7 @@ struct TMediatorStep {
ui64 Moderator;
- TTx(TTxId txId, TTabletId *affected, ui32 affectedSize, ui64 moderator)
+ TTx(TTxId txId, const TTabletId *affected, ui32 affectedSize, ui64 moderator)
: TxId(txId)
, PushToAffected(affected, affected + affectedSize)
, Moderator(moderator)
@@ -110,16 +101,21 @@ struct TMediatorStep {
TTx(TTxId txId)
: TxId(txId)
+ , Moderator(0)
{
Y_VERIFY(TxId != 0);
}
};
- const TTabletId MediatorId;
- const TStepId Step;
+ TTabletId MediatorId;
+ TStepId Step;
bool Confirmed;
+
TVector<TTx> Transactions;
+ // Used by mediator queue to track acks
+ size_t References = 0;
+
TMediatorStep(TTabletId mediatorId, TStepId step)
: MediatorId(mediatorId)
, Step(step)
@@ -127,6 +123,8 @@ struct TMediatorStep {
{}
};
+using TMediatorStepList = std::list<TMediatorStep>;
+
struct TMediatorConfirmations {
const TTabletId MediatorId;
@@ -137,6 +135,45 @@ struct TMediatorConfirmations {
{}
};
+struct TEvMediatorQueueStep : public TEventLocal<TEvMediatorQueueStep, TEvTxCoordinator::EvMediatorQueueStep> {
+ TMediatorStepList Steps;
+
+ TEvMediatorQueueStep() = default;
+
+ void SpliceStep(TMediatorStepList& list, TMediatorStepList::iterator it) {
+ Steps.splice(Steps.end(), list, it);
+ }
+};
+
+struct TEvMediatorQueueRestart : public TEventLocal<TEvMediatorQueueRestart, TEvTxCoordinator::EvMediatorQueueRestart> {
+ const ui64 MediatorId;
+ const ui64 StartFrom;
+ const ui64 GenCookie;
+
+ TEvMediatorQueueRestart(ui64 mediatorId, ui64 startFrom, ui64 genCookie)
+ : MediatorId(mediatorId)
+ , StartFrom(startFrom)
+ , GenCookie(genCookie)
+ {}
+};
+
+struct TEvMediatorQueueStop : public TEventLocal<TEvMediatorQueueStop, TEvTxCoordinator::EvMediatorQueueStop> {
+ const ui64 MediatorId;
+
+ TEvMediatorQueueStop(ui64 mediatorId)
+ : MediatorId(mediatorId)
+ {}
+};
+
+struct TEvMediatorQueueConfirmations : public TEventLocal<TEvMediatorQueueConfirmations, TEvTxCoordinator::EvMediatorQueueConfirmations> {
+ std::unique_ptr<NFlatTxCoordinator::TMediatorConfirmations> Confirmations;
+
+ TEvMediatorQueueConfirmations(std::unique_ptr<NFlatTxCoordinator::TMediatorConfirmations> &&confirmations)
+ : Confirmations(std::move(confirmations))
+ {}
+};
+
+
IActor* CreateTxCoordinatorMediatorQueue(const TActorId &owner, ui64 coordinator, ui64 mediator, ui64 coordinatorGeneration);
using NTabletFlatExecutor::TTabletExecutedFlat;
@@ -253,8 +290,6 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
};
struct TQueueType {
- typedef TOneOneQueueInplace<TTransactionProposal *, 512> TQ;
-
struct TFlowEntry {
ui16 Weight;
ui16 Limit;
@@ -265,14 +300,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
{}
};
- struct TSlot {
- TAutoPtr<TQ, TQ::TPtrCleanDestructor> Queue;
- ui64 QueueSize;
+ using TQueue = std::deque<TTransactionProposal>;
- TSlot()
- : Queue(new TQ())
- , QueueSize(0)
- {}
+ struct TSlot : public TQueue {
+ using TQueue::TQueue;
};
typedef TMap<TStepId, TSlot> TSlotQueue;
@@ -280,15 +311,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
TSlotQueue Low;
TSlot RapidSlot; // slot for entries with schedule on 'right now' moment (actually - with min-schedule time in past).
- TAutoPtr<TQ, TQ::TPtrCleanDestructor> Unsorted;
+ std::optional<TSlot> Unsorted;
TSlot& LowSlot(TStepId step) {
- TMap<TStepId, TSlot>::iterator it = Low.find(step);
- if (it != Low.end())
- return it->second;
- std::pair<TMap<TStepId, TSlot>::iterator, bool> xit = Low.insert(TSlotQueue::value_type(step, TSlot()));
- TSlot &ret = xit.first->second;
- return ret;
+ return Low[step];
}
TStepId MinLowSlot() const {
@@ -308,7 +334,6 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
struct TTxSchema;
struct TTxUpgrade;
struct TTxPlanStep;
- struct TTxRestartMediatorQueue;
struct TTxMediatorConfirmations;
struct TTxConsistencyCheck;
struct TTxMonitoring;
@@ -327,9 +352,9 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
const NKikimrSubDomains::TProcessingParams &config);
ITransaction* CreateTxSchema();
ITransaction* CreateTxUpgrade();
- ITransaction* CreateTxPlanStep(TStepId toStep, TVector<TQueueType::TSlot> &slots);
+ ITransaction* CreateTxPlanStep(TStepId toStep, std::deque<TQueueType::TSlot> &&slots);
ITransaction* CreateTxRestartMediatorQueue(TTabletId mediatorId, ui64 genCookie);
- ITransaction* CreateTxMediatorConfirmations(TAutoPtr<TMediatorConfirmations> &confirmations);
+ ITransaction* CreateTxMediatorConfirmations(std::unique_ptr<TMediatorConfirmations> &&confirmations);
ITransaction* CreateTxConsistencyCheck();
ITransaction* CreateTxMonitoring(NMon::TEvRemoteHttpInfo::TPtr& ev);
ITransaction* CreateTxStopGuard();
@@ -349,18 +374,9 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
};
struct TMediator {
- typedef TOneOneQueueInplace<TMediatorStep *, 32> TStepQueue;
-
TActorId QueueActor;
- ui64 GenCookie;
- bool PushUpdates;
-
- TAutoPtr<TStepQueue, TStepQueue::TPtrCleanDestructor> Queue;
-
- TMediator()
- : GenCookie(0)
- , PushUpdates(false)
- {}
+ TMediatorStepList Queue;
+ bool Active = false;
};
struct TTransaction {
@@ -653,27 +669,26 @@ private:
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev);
void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvTxCoordinator::TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvTxCoordinator::TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvTxCoordinator::TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvTxCoordinator::TEvCoordinatorConfirmPlan::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorQueueStop::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorQueueRestart::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorQueueConfirmations::TPtr &ev, const TActorContext &ctx);
void Handle(TEvSubDomain::TEvConfigure::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx);
+ void SendStepConfirmations(TCoordinatorStepConfirmations &confirmations, const TActorContext &ctx);
void DoConfiguration(const TEvSubDomain::TEvConfigure &ev, const TActorContext &ctx, const TActorId &ackTo = TActorId());
void Sync(ui64 mediator, const TActorContext &ctx);
void Sync(const TActorContext &ctx);
- void PlanTx(TAutoPtr<TTransactionProposal> &proposal, const TActorContext &ctx);
+ void PlanTx(TTransactionProposal &&proposal, const TActorContext &ctx);
bool AllowReducedPlanResolution() const;
void SchedulePlanTick();
void SchedulePlanTickExact(ui64 next);
void SchedulePlanTickAligned(ui64 next);
- bool RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const;
void TryInitMonCounters(const TActorContext &ctx);
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override;
@@ -728,10 +743,9 @@ public:
hFunc(TEvTxProxy::TEvSubscribeLastStep, Handle);
hFunc(TEvTxProxy::TEvUnsubscribeLastStep, Handle);
HFunc(TEvPrivate::TEvPlanTick, Handle);
- HFunc(TEvTxCoordinator::TEvMediatorQueueConfirmations, Handle);
- HFunc(TEvTxCoordinator::TEvMediatorQueueRestart, Handle);
- HFunc(TEvTxCoordinator::TEvMediatorQueueStop, Handle);
- HFunc(TEvTxCoordinator::TEvCoordinatorConfirmPlan, Handle);
+ HFunc(TEvMediatorQueueConfirmations, Handle);
+ HFunc(TEvMediatorQueueRestart, Handle);
+ HFunc(TEvMediatorQueueStop, Handle);
HFunc(TEvTabletPipe::TEvServerConnected, Handle);
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle);
diff --git a/ydb/core/tx/coordinator/coordinator_ut.cpp b/ydb/core/tx/coordinator/coordinator_ut.cpp
index d7c673a5dc..7140b98a1d 100644
--- a/ydb/core/tx/coordinator/coordinator_ut.cpp
+++ b/ydb/core/tx/coordinator/coordinator_ut.cpp
@@ -32,9 +32,9 @@ namespace NKikimr::NFlatTxCoordinator::NTest {
ui64 lastMediatorStep = 0;
auto observeMediatorSteps = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
switch (ev->GetTypeRewrite()) {
- case TEvTxCoordinator::TEvMediatorQueueStep::EventType: {
- auto* msg = ev->Get<TEvTxCoordinator::TEvMediatorQueueStep>();
- ui64 step = msg->Step->Step;
+ case TEvMediatorQueueStep::EventType: {
+ auto* msg = ev->Get<TEvMediatorQueueStep>();
+ ui64 step = msg->Steps.back().Step;
lastMediatorStep = Max(lastMediatorStep, step);
break;
}
@@ -476,6 +476,59 @@ namespace NKikimr::NFlatTxCoordinator::NTest {
UNIT_ASSERT_C(hooks.PersistConfig_.empty(), "Unexpected persist attempt after a second reboot");
}
+ Y_UNIT_TEST(LastEmptyStepResent) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(1)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+
+ auto &runtime = *server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
+
+ auto sender = runtime.AllocateEdgeActor();
+ ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ std::vector<ui64> emptySteps;
+ auto stepsObserver = [&](auto&, auto& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxCoordinator::TEvCoordinatorStep::EventType: {
+ auto* msg = ev->template Get<TEvTxCoordinator::TEvCoordinatorStep>();
+ ui64 step = msg->Record.GetStep();
+ bool empty = msg->Record.TransactionsSize() == 0;
+ Cerr << "... observed " << step << ": " << (empty ? "empty" : "not empty") << Endl;
+ if (empty) {
+ emptySteps.push_back(step);
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(stepsObserver);
+
+ runtime.SimulateSleep(TDuration::Seconds(10));
+ UNIT_ASSERT_C(emptySteps.size() > 1, "Expected multiple empty steps, not " << emptySteps.size());
+ ui64 lastObserved = emptySteps.back();
+ emptySteps.clear();
+
+ RebootTablet(runtime, mediatorId, sender);
+ if (emptySteps.empty()) {
+ Cerr << "... waiting for empty steps" << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return !emptySteps.empty();
+ };
+ runtime.DispatchEvents(options);
+ }
+ UNIT_ASSERT_C(!emptySteps.empty(), "Expected at least one empty step");
+ UNIT_ASSERT_C(emptySteps.front() == lastObserved,
+ "Expected to observe " << lastObserved << " empty step, not " << emptySteps.front());
+ }
+
} // Y_UNIT_TEST_SUITE(Coordinator)
diff --git a/ydb/core/tx/coordinator/mediator_queue.cpp b/ydb/core/tx/coordinator/mediator_queue.cpp
index 9b3955fa1b..4195c1cbe8 100644
--- a/ydb/core/tx/coordinator/mediator_queue.cpp
+++ b/ydb/core/tx/coordinator/mediator_queue.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
namespace NKikimr {
namespace NFlatTxCoordinator {
@@ -17,11 +18,15 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
const ui64 CoordinatorGeneration;
TActorId PipeClient;
- ui64 GenCookie;
- ui64 PrevStep;
+ ui64 GenCookie = 0;
+ ui64 PrevStep = 0;
+ bool Active = false;
- TAutoPtr<TMediatorConfirmations> Confirmations;
+ TMediatorStepList Queue;
+ THashMap<std::pair<ui64, ui64>, TMediatorStepList::iterator> WaitingAcks;
+
+ std::unique_ptr<TMediatorConfirmations> Confirmations;
void Die(const TActorContext &ctx) override {
if (PipeClient) {
@@ -45,6 +50,57 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
<< " tablet# " << Coordinator << " SEND EvCoordinatorSync to# " << Mediator << " Mediator");
NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorSync(++GenCookie, Mediator, Coordinator));
Become(&TThis::StateSync);
+ Active = false;
+ }
+
+ void SendStep(const TMediatorStep &step, const TActorContext &ctx) {
+ LOG_DEBUG(ctx, NKikimrServices::TX_COORDINATOR_PRIVATE, "[%" PRIu64 "] to [%" PRIu64 "], step [%" PRIu64 "]", Coordinator, Mediator, step.Step);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
+ << " tablet# " << Coordinator << " SEND to# " << Mediator << " Mediator TEvCoordinatorStep");
+ NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorStep(
+ step, PrevStep, Mediator, Coordinator, CoordinatorGeneration));
+ PrevStep = step.Step;
+ }
+
+ void SendConfirmations(const TActorContext &ctx) {
+ if (Confirmations) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
+ << " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString()
+ << " Owner");
+ ctx.Send(Owner, new TEvMediatorQueueConfirmations(std::move(Confirmations)));
+ }
+ }
+
+ /**
+ * Filters step by removing txs/shards that have already been acknowledged
+ */
+ void FilterAcked(TMediatorStep &step) {
+ auto end = std::remove_if(
+ step.Transactions.begin(),
+ step.Transactions.end(),
+ [this](TMediatorStep::TTx& tx) -> bool {
+ ui64 txId = tx.TxId;
+ auto end = std::remove_if(
+ tx.PushToAffected.begin(),
+ tx.PushToAffected.end(),
+ [this, txId](ui64 shardId) -> bool {
+ // Remove shards that have been acknowledged (not in a waiting set)
+ return WaitingAcks.find(std::make_pair(txId, shardId)) == WaitingAcks.end();
+ });
+ if (end != tx.PushToAffected.end()) {
+ tx.PushToAffected.erase(end, tx.PushToAffected.end());
+ }
+ // Remove transactions without any shards left
+ return tx.PushToAffected.empty();
+ });
+ if (end != step.Transactions.end()) {
+ step.Transactions.erase(end, step.Transactions.end());
+ // Note: fully acked steps are removed immediately
+ if (Y_UNLIKELY(step.Transactions.empty())) {
+ step.Transactions.shrink_to_fit();
+ }
+ }
}
void Handle(TEvTxCoordinator::TEvCoordinatorSyncResult::TPtr &ev, const TActorContext &ctx) {
@@ -62,34 +118,55 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
PrevStep = 0;
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
<< " tablet# " << Coordinator << " SEND EvMediatorQueueRestart to# " << Owner.ToString() << " Owner");
- ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueRestart(Mediator, 0, ++GenCookie));
+ ctx.Send(Owner, new TEvMediatorQueueRestart(Mediator, 0, ++GenCookie));
Become(&TThis::StateWork);
- }
-
- void Handle(TEvTxCoordinator::TEvMediatorQueueStep::TPtr &ev, const TActorContext &ctx) {
- TEvTxCoordinator::TEvMediatorQueueStep *msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
- << " HANDLE EvMediatorQueueStep step# " << msg->Step->Step);
- if (msg->GenCookie == GenCookie && PipeClient) {
- const NFlatTxCoordinator::TMediatorStep &step = *msg->Step;
+ Active = true;
+ for (auto& step : Queue) {
+ FilterAcked(step);
+ // TODO: limit the number of steps inflight
+ SendStep(step, ctx);
+ }
+ }
- LOG_DEBUG(ctx, NKikimrServices::TX_COORDINATOR_PRIVATE, "[%" PRIu64 "] to [%" PRIu64 "], step [%" PRIu64 "]", Coordinator, Mediator, step.Step);
+ void Handle(TEvMediatorQueueStep::TPtr &ev, const TActorContext &ctx) {
+ TEvMediatorQueueStep *msg = ev->Get();
+ while (!msg->Steps.empty()) {
+ auto it = msg->Steps.begin();
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
- << " tablet# " << Coordinator << " SEND to# " << Mediator << " Mediator TEvCoordinatorStep");
- NTabletPipe::SendData(ctx, PipeClient, new TEvTxCoordinator::TEvCoordinatorStep(
- step, PrevStep, Mediator, Coordinator, CoordinatorGeneration));
- PrevStep = step.Step;
+ << " HANDLE EvMediatorQueueStep step# " << it->Step);
+
+ // Remove the last empty step (empty steps except the last one are not needed)
+ if (!Queue.empty() && Queue.back().Transactions.empty()) {
+ Queue.pop_back();
+ }
+
+ Queue.splice(Queue.end(), msg->Steps, it);
+
+ // Index by TxId/ShardId pairs while waiting for acks
+ Y_VERIFY(it->References == 0);
+ for (auto& tx : it->Transactions) {
+ for (ui64 shardId : tx.PushToAffected) {
+ auto res = WaitingAcks.insert(
+ std::make_pair(
+ std::make_pair(tx.TxId, shardId),
+ it));
+ Y_VERIFY_DEBUG(res.second);
+ if (res.second) {
+ it->References++;
+ }
+ }
+ }
+
+ if (Active) {
+ // TODO: limit the number of steps inflight
+ SendStep(*it, ctx);
+ }
}
- if (Confirmations) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
- << " tablet# " << Coordinator << " SEND EvMediatorQueueConfirmations to# " << Owner.ToString()
- << " Owner");
- ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueConfirmations(Confirmations));
- }
+ SendConfirmations(ctx);
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
@@ -112,10 +189,11 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
<< " HANDLE EvClientDestroyed");
if (msg->ClientId != PipeClient)
return;
+ Active = false;
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
<< " tablet# " << Coordinator << " SEND EvMediatorQueueStop to# " << Owner.ToString()
<< " Owner Mediator# " << Mediator);
- ctx.Send(Owner, new TEvTxCoordinator::TEvMediatorQueueStop(Mediator));
+ ctx.Send(Owner, new TEvMediatorQueueStop(Mediator));
Sync(ctx);
}
@@ -124,12 +202,36 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR_MEDIATOR_QUEUE, "Actor# " << ctx.SelfID.ToString()
<< " HANDLE EvPlanStepAck");
- if (!Confirmations)
- Confirmations.Reset(new TMediatorConfirmations(Mediator));
+ bool fullyConfirmed = false;
const TTabletId tabletId = record.GetTabletId();
for (const auto txid : record.GetTxId()) {
- Confirmations->Acks[txid].insert(tabletId);
+ auto it = WaitingAcks.find(std::make_pair(txid, tabletId));
+ if (it != WaitingAcks.end()) {
+ Y_VERIFY(it->second->References > 0);
+
+ if (!Confirmations)
+ Confirmations.reset(new TMediatorConfirmations(Mediator));
+ Confirmations->Acks[txid].insert(tabletId);
+
+ if (0 == --it->second->References) {
+ Y_VERIFY(!Queue.empty());
+ auto last = --Queue.end();
+ if (it->second != last) {
+ Queue.erase(it->second);
+ } else {
+ // On reconnect we will resend it as empty
+ it->second->Transactions.clear();
+ it->second->Transactions.shrink_to_fit();
+ }
+ fullyConfirmed = true;
+ }
+ WaitingAcks.erase(it);
+ }
+ }
+
+ if (fullyConfirmed) {
+ SendConfirmations(ctx);
}
}
@@ -143,9 +245,6 @@ public:
, Coordinator(coordinator)
, Mediator(mediator)
, CoordinatorGeneration(coordinatorGeneration)
- , PipeClient()
- , GenCookie(0)
- , PrevStep(0)
{}
void Bootstrap(const TActorContext &ctx) {
@@ -165,7 +264,7 @@ public:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
- HFunc(TEvTxCoordinator::TEvMediatorQueueStep, Handle);
+ HFunc(TEvMediatorQueueStep, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
CFunc(TEvents::TSystem::PoisonPill, Die)
}
diff --git a/ydb/core/tx/coordinator/ya.make b/ydb/core/tx/coordinator/ya.make
index efd565a3db..d5377a15af 100644
--- a/ydb/core/tx/coordinator/ya.make
+++ b/ydb/core/tx/coordinator/ya.make
@@ -14,7 +14,6 @@ SRCS(
coordinator__plan_step.cpp
coordinator__read_step_subscriptions.cpp
coordinator__restore_params.cpp
- coordinator__restart_mediator.cpp
coordinator__restore_transaction.cpp
coordinator__schema.cpp
coordinator__schema_upgrade.cpp
@@ -27,6 +26,7 @@ PEERDIR(
library/cpp/actors/core
library/cpp/actors/helpers
library/cpp/actors/interconnect
+ library/cpp/containers/absl_flat_hash
ydb/core/actorlib_impl
ydb/core/base
ydb/core/engine/minikql