aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2025-02-04 17:12:56 +0300
committerGitHub <noreply@github.com>2025-02-04 17:12:56 +0300
commite6e5842c8d256519bcb4ae856b4d6983b81ae0ab (patch)
treed20600a36680a212660a8f0c8923bdb511abe521
parent60a898ffe0a3452ccc8cc320f2eb6a9804111e0d (diff)
downloadydb-e6e5842c8d256519bcb4ae856b4d6983b81ae0ab.tar.gz
Fix mediator merge for out-of-order transactions (#14182)
-rw-r--r--ydb/core/tx/mediator/mediator_ut.cpp104
-rw-r--r--ydb/core/tx/mediator/tablet_queue.cpp97
2 files changed, 138 insertions, 63 deletions
diff --git a/ydb/core/tx/mediator/mediator_ut.cpp b/ydb/core/tx/mediator/mediator_ut.cpp
index 43b7a35720..2f956fe940 100644
--- a/ydb/core/tx/mediator/mediator_ut.cpp
+++ b/ydb/core/tx/mediator/mediator_ut.cpp
@@ -416,8 +416,20 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
}
};
+ struct TCoordinatorIndex {
+ size_t Value;
+
+ explicit TCoordinatorIndex(size_t value)
+ : Value(value)
+ {}
+ };
+
class TMediatorTestWithWatcher : public NUnitTest::TBaseFixture {
public:
+ TMediatorTestWithWatcher(ui64 coordinatorCount = 1)
+ : CoordinatorCount(coordinatorCount)
+ {}
+
void SetUp(NUnitTest::TTestContext&) override {
auto& pm = PM.emplace();
@@ -434,7 +446,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TABLETQUEUE, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_PRIVATE, NLog::PRI_TRACE);
- CoordinatorId = ChangeStateStorage(TTestTxConfig::Coordinator, Server->GetSettings().Domain);
+ for (ui64 i = 0; i < CoordinatorCount; ++i) {
+ CoordinatorIds.push_back(ChangeStateStorage(TTestTxConfig::Coordinator + i, Server->GetSettings().Domain));
+ }
MediatorId = ChangeStateStorage(TTestTxConfig::TxTablet0, Server->GetSettings().Domain);
MediatorBootstrapper = CreateTestBootstrapper(runtime,
@@ -451,7 +465,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
auto msg = std::make_unique<TEvSubDomain::TEvConfigure>();
msg->Record.SetVersion(1);
msg->Record.SetPlanResolution(500);
- msg->Record.AddCoordinators(CoordinatorId);
+ for (ui64 coordinatorId : CoordinatorIds) {
+ msg->Record.AddCoordinators(coordinatorId);
+ }
msg->Record.AddMediators(MediatorId);
msg->Record.SetTimeCastBucketsPerMediator(1);
runtime.SendToPipe(MediatorId, Sender, msg.release());
@@ -482,11 +498,11 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
return client;
}
- NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) {
+ NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui64 genCookie) {
auto& runtime = GetRuntime();
TActorId client = QueuePipeClient(queue);
- runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorId));
+ runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorIds.at(coordinatorIndex.Value)));
auto ev = runtime.GrabEdgeEventRethrow<TEvTxCoordinator::TEvCoordinatorSyncResult>(queue);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetCookie(), genCookie);
@@ -494,13 +510,17 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
return std::move(msg->Record);
}
- void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
+ NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) {
+ return Sync(queue, TCoordinatorIndex(0), genCookie);
+ }
+
+ void SendStep(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
auto& runtime = GetRuntime();
TActorId client = QueuePipeClient(queue);
// Note: prevStep is not actually used by mediator
auto msg = std::make_unique<TEvTxCoordinator::TEvCoordinatorStep>(
- step, /* prevStep */ 0, MediatorId, CoordinatorId, gen);
+ step, /* prevStep */ 0, MediatorId, CoordinatorIds.at(coordinatorIndex.Value), gen);
size_t totalAffected = 0;
for (auto& pr : txs) {
auto* protoTx = msg->Record.AddTransactions();
@@ -514,6 +534,10 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
runtime.SendToPipe(client, queue, msg.release());
}
+ void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
+ return SendStep(queue, TCoordinatorIndex(0), gen, step, std::move(txs));
+ }
+
ui64 AddTargetTablet() {
auto& runtime = GetRuntime();
@@ -575,7 +599,8 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
TServer::TPtr Server;
TActorId Sender;
- ui64 CoordinatorId;
+ ui64 CoordinatorCount;
+ std::vector<ui64> CoordinatorIds;
ui64 MediatorId;
TActorId MediatorBootstrapper;
@@ -589,6 +614,13 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
THashMap<TActorId, TActorId> PerQueuePipes; // Queue -> PipeClient
};
+ class TMediatorTestWithWatcherTwoCoordinators : public TMediatorTestWithWatcher {
+ public:
+ TMediatorTestWithWatcherTwoCoordinators()
+ : TMediatorTestWithWatcher(2)
+ {}
+ };
+
Y_UNIT_TEST_F(BasicTimecastUpdates, TMediatorTestWithWatcher) {
auto& runtime = GetRuntime();
@@ -1201,6 +1233,64 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
}
}
+ Y_UNIT_TEST_F(OneCoordinatorResendTxNotLost, TMediatorTestWithWatcherTwoCoordinators) {
+ auto& runtime = GetRuntime();
+
+ ui64 tablet1 = AddTargetTablet();
+ ui64 tablet2 = AddTargetTablet();
+ AddWatchTablet(tablet1);
+ AddWatchTablet(tablet2);
+ WaitNoPending();
+ WatcherState->Updates.clear();
+
+ auto queue1 = runtime.AllocateEdgeActor();
+ Sync(queue1, TCoordinatorIndex(0), 1);
+ auto queue2 = runtime.AllocateEdgeActor();
+ Sync(queue2, TCoordinatorIndex(1), 1);
+
+ TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);
+
+ SendStep(queue1, TCoordinatorIndex(0), /* gen */ 1, /* step */ 1010, {
+ {1, {tablet1, tablet2}},
+ });
+ SendStep(queue2, TCoordinatorIndex(1), /* gen */ 1, /* step */ 1010, {
+ {2, {tablet1, tablet2}},
+ });
+ runtime.SimulateSleep(TDuration::MilliSeconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates(
+ TGranularUpdate(1010, {{tablet1, 1009}, {tablet2, 1009}})));
+ WatcherState->Updates.clear();
+ UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
+ blockedPlan.clear();
+
+ // Simulate one coordinator restarting and resending the step
+ auto queue3 = runtime.AllocateEdgeActor();
+ Sync(queue3, TCoordinatorIndex(1), 2);
+ SendStep(queue3, TCoordinatorIndex(1), /* gen */ 2, /* step */ 1010, {
+ {2, {tablet1, tablet2}},
+ });
+ runtime.SimulateSleep(TDuration::MilliSeconds(1));
+
+ // Reboot tablets, we expect plans to be resent
+ RebootTablet(runtime, tablet1, Sender);
+ RebootTablet(runtime, tablet2, Sender);
+ runtime.SimulateSleep(TDuration::MilliSeconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates());
+ UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
+
+ // Tablets must see both transactions
+ // Note: the bug was causing some transactions to be lost
+ THashSet<ui64> observedTxIds;
+ for (auto& ev : blockedPlan) {
+ auto* msg = ev->Get();
+ for (const auto& tx : msg->Record.GetTransactions()) {
+ observedTxIds.insert(tx.GetTxId());
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(observedTxIds.size(), 2u);
+ }
+
}
} // namespace NKikimr::NTxMediator
diff --git a/ydb/core/tx/mediator/tablet_queue.cpp b/ydb/core/tx/mediator/tablet_queue.cpp
index 2198d23a36..45df30cb0d 100644
--- a/ydb/core/tx/mediator/tablet_queue.cpp
+++ b/ydb/core/tx/mediator/tablet_queue.cpp
@@ -35,9 +35,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TStepEntry* const StepRef;
// The current list of transactions
TVector<TTx> Transactions;
- // An updated list of transactions (after last sent to tablet)
- // We may need to send acks to the updated AckTo actor
- TVector<TTx> OutOfOrder;
TStep(TStepEntry* stepRef)
: StepRef(stepRef)
@@ -60,8 +57,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
return State == EState::Init && Queue.empty() && Watchers.empty();
}
- void MergeOutOfOrder(TStep* sx);
- void MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update);
+ void MergeOutOfOrder(TStep* sx, TVector<TTx>&& update);
};
struct TStepEntry {
@@ -230,19 +226,19 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TTabletEntry* tabletEntry = Tablets.FindPtr(tabletId);
if (tabletEntry == nullptr) {
// We don't have a tablet entry, which means no steps inflight
- AckOoO(tabletId, step, msg->Transactions, ctx);
+ AckTransactions(tabletId, step, msg->Transactions, ctx);
return;
}
auto it = tabletEntry->Queue.find(step);
if (it == tabletEntry->Queue.end()) {
// This step is already confirmed, reply immediately
- AckOoO(tabletId, step, msg->Transactions, ctx);
+ AckTransactions(tabletId, step, msg->Transactions, ctx);
return;
}
// Save possibly updated AckTo for later
- tabletEntry->MergeToOutOfOrder(&it->second, std::move(msg->Transactions));
+ tabletEntry->MergeOutOfOrder(&it->second, std::move(msg->Transactions));
}
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
@@ -264,8 +260,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
auto it = tabletEntry->Queue.begin();
while (it != tabletEntry->Queue.end()) {
TTabletEntry::TStep* sx = &it->second;
- tabletEntry->MergeOutOfOrder(sx);
- AckOoO(tabletId, sx->StepRef->Step, sx->Transactions, ctx);
+ AckTransactions(tabletId, sx->StepRef->Step, sx->Transactions, ctx);
// Note: will also auto-remove itself from sx->StepRef->TabletSteps
it = tabletEntry->Queue.erase(it);
}
@@ -288,7 +283,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
tabletEntry->State = TTabletEntry::EState::Connected;
for (auto& pr : tabletEntry->Queue) {
TTabletEntry::TStep* sx = &pr.second;
- tabletEntry->MergeOutOfOrder(sx);
SendToTablet(sx, tabletId, ctx);
}
}
@@ -364,10 +358,14 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
}
TTabletEntry::TStep* sx = &it->second;
- if (!sx->OutOfOrder.empty()) {
- // Confirm out-of-order requests
- AckOoO(tabletId, step, sx->OutOfOrder, ctx);
- }
+ // Note: in the past we didn't ack all transactions (except out-of-order
+ // requests when coordinator reconnected), since they are acknowledged
+ // by tablets. However, events from tablets to coordinator may be lost,
+ // and those transactions will never be acknowledged until the network
+ // between coordinator and mediator also flaps. In the future we might
+ // want to avoid acking transactions by tablets directly, so it doesn't
+ // introduce unnecessary interconnect chatter to single nodes.
+ AckTransactions(tabletId, step, sx->Transactions, ctx);
// Note: will also auto-remove itself from sx->StepRef->TabletSteps
tabletEntry->Queue.erase(it);
@@ -402,7 +400,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TimecastWatches.erase(ev->Sender);
}
- void AckOoO(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) {
+ void AckTransactions(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) {
TMap<TActorId, std::unique_ptr<TEvTxProcessing::TEvPlanStepAck>> acks;
for (const TTx &tx : transactions) {
auto& ack = acks[tx.AckTo];
@@ -758,26 +756,6 @@ private:
THashMap<TActorId, TGranularServer> GranularServers;
};
-/**
- * Returns true when all transactions of x are present in superset
- */
-static bool IsSubsetOf(const TVector<TTx>& x, const TVector<TTx>& superset) {
- auto it = x.begin();
- auto itSuperset = superset.begin();
- while (it != x.end()) {
- // Position superset to the lowerbound of the current TxId
- while (itSuperset != superset.end() && itSuperset->TxId < it->TxId) {
- ++itSuperset;
- }
- if (itSuperset == superset.end() || it->TxId != itSuperset->TxId) {
- return false;
- }
- ++it;
- ++itSuperset;
- }
- return true;
-}
-
static TString DumpTxIds(const TVector<TTx>& v) {
TStringBuilder stream;
stream << '{';
@@ -791,31 +769,38 @@ static TString DumpTxIds(const TVector<TTx>& v) {
return std::move(stream);
}
-void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx) {
- if (!sx->OutOfOrder.empty()) {
- // Since OutOfOrder is an update it might be missing some lost or
- // already acknowledged transactions that we don't have to resend.
- // We have previously validated that OutOfOrder ⊂ Transactions
- sx->Transactions = std::move(sx->OutOfOrder);
- }
-}
-
-void TTxMediatorTabletQueue::TTabletEntry::MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update) {
- // Update might be missing some lost or already acknowledged transactions
- // that we don't have to resend later. We validate that update is a subset
- // of a previously received step.
- const TVector<TTx>& prev = sx->OutOfOrder.empty() ? sx->Transactions : sx->OutOfOrder;
- if (Y_UNLIKELY(!IsSubsetOf(update, prev))) {
- // Coordinator shouldn't add new transaction to existing steps, so we
- // complain. However, even if that happens, it's ok for us to send
- // those transactions later, or never.
+void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx, TVector<TTx>&& update) {
+ // Step transactions are a union from multiple coordinators, and the update
+ // is currently unacknowledged transactions from a single coordinator.
+ // The update must be a subset of the full transaction list and cannot
+ // introduce new transactions out of thin air.
+ auto dst = sx->Transactions.begin();
+ auto src = update.begin();
+ bool subset = true;
+ while (dst != sx->Transactions.end() && src != update.end()) {
+ if (dst->TxId < src->TxId) {
+ ++dst;
+ continue;
+ }
+ if (Y_UNLIKELY(dst->TxId != src->TxId)) {
+ subset = false;
+ ++src;
+ continue;
+ }
+ dst->AckTo = src->AckTo;
+ ++dst;
+ ++src;
+ }
+ if (Y_UNLIKELY(!subset) || Y_UNLIKELY(src != update.end())) {
+ // Coordinators shouldn't add new transactions to existing steps, so we
+ // complain. Even if that happens, however, it's ok for us to send
+ // those transactions later, or never. Currently we don't.
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_MEDIATOR_TABLETQUEUE,
"Received out-of-order step " << sx->StepRef->Step
<< " for tablet " << TabletId
<< " with transactions " << DumpTxIds(update)
- << " which are not a subset of previously received " << DumpTxIds(prev));
+ << " which are not a subset of previously received " << DumpTxIds(sx->Transactions));
}
- sx->OutOfOrder = std::move(update);
}
}