diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2025-02-04 17:12:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-04 17:12:56 +0300 |
commit | e6e5842c8d256519bcb4ae856b4d6983b81ae0ab (patch) | |
tree | d20600a36680a212660a8f0c8923bdb511abe521 | |
parent | 60a898ffe0a3452ccc8cc320f2eb6a9804111e0d (diff) | |
download | ydb-e6e5842c8d256519bcb4ae856b4d6983b81ae0ab.tar.gz |
Fix mediator merge for out-of-order transactions (#14182)
-rw-r--r-- | ydb/core/tx/mediator/mediator_ut.cpp | 104 | ||||
-rw-r--r-- | ydb/core/tx/mediator/tablet_queue.cpp | 97 |
2 files changed, 138 insertions, 63 deletions
diff --git a/ydb/core/tx/mediator/mediator_ut.cpp b/ydb/core/tx/mediator/mediator_ut.cpp index 43b7a35720..2f956fe940 100644 --- a/ydb/core/tx/mediator/mediator_ut.cpp +++ b/ydb/core/tx/mediator/mediator_ut.cpp @@ -416,8 +416,20 @@ Y_UNIT_TEST_SUITE(MediatorTest) { } }; + struct TCoordinatorIndex { + size_t Value; + + explicit TCoordinatorIndex(size_t value) + : Value(value) + {} + }; + class TMediatorTestWithWatcher : public NUnitTest::TBaseFixture { public: + TMediatorTestWithWatcher(ui64 coordinatorCount = 1) + : CoordinatorCount(coordinatorCount) + {} + void SetUp(NUnitTest::TTestContext&) override { auto& pm = PM.emplace(); @@ -434,7 +446,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) { runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TABLETQUEUE, NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_PRIVATE, NLog::PRI_TRACE); - CoordinatorId = ChangeStateStorage(TTestTxConfig::Coordinator, Server->GetSettings().Domain); + for (ui64 i = 0; i < CoordinatorCount; ++i) { + CoordinatorIds.push_back(ChangeStateStorage(TTestTxConfig::Coordinator + i, Server->GetSettings().Domain)); + } MediatorId = ChangeStateStorage(TTestTxConfig::TxTablet0, Server->GetSettings().Domain); MediatorBootstrapper = CreateTestBootstrapper(runtime, @@ -451,7 +465,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) { auto msg = std::make_unique<TEvSubDomain::TEvConfigure>(); msg->Record.SetVersion(1); msg->Record.SetPlanResolution(500); - msg->Record.AddCoordinators(CoordinatorId); + for (ui64 coordinatorId : CoordinatorIds) { + msg->Record.AddCoordinators(coordinatorId); + } msg->Record.AddMediators(MediatorId); msg->Record.SetTimeCastBucketsPerMediator(1); runtime.SendToPipe(MediatorId, Sender, msg.release()); @@ -482,11 +498,11 @@ Y_UNIT_TEST_SUITE(MediatorTest) { return client; } - NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) { + NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui64 genCookie) { auto& runtime = GetRuntime(); TActorId client = QueuePipeClient(queue); - runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorId)); + runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorIds.at(coordinatorIndex.Value))); auto ev = runtime.GrabEdgeEventRethrow<TEvTxCoordinator::TEvCoordinatorSyncResult>(queue); auto* msg = ev->Get(); UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetCookie(), genCookie); @@ -494,13 +510,17 @@ Y_UNIT_TEST_SUITE(MediatorTest) { return std::move(msg->Record); } - void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) { + NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) { + return Sync(queue, TCoordinatorIndex(0), genCookie); + } + + void SendStep(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) { auto& runtime = GetRuntime(); TActorId client = QueuePipeClient(queue); // Note: prevStep is not actually used by mediator auto msg = std::make_unique<TEvTxCoordinator::TEvCoordinatorStep>( - step, /* prevStep */ 0, MediatorId, CoordinatorId, gen); + step, /* prevStep */ 0, MediatorId, CoordinatorIds.at(coordinatorIndex.Value), gen); size_t totalAffected = 0; for (auto& pr : txs) { auto* protoTx = msg->Record.AddTransactions(); @@ -514,6 +534,10 @@ Y_UNIT_TEST_SUITE(MediatorTest) { runtime.SendToPipe(client, queue, msg.release()); } + void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) { + return SendStep(queue, TCoordinatorIndex(0), gen, step, std::move(txs)); + } + ui64 AddTargetTablet() { auto& runtime = GetRuntime(); @@ -575,7 +599,8 @@ Y_UNIT_TEST_SUITE(MediatorTest) { TServer::TPtr Server; TActorId Sender; - ui64 CoordinatorId; + ui64 CoordinatorCount; + std::vector<ui64> CoordinatorIds; ui64 MediatorId; TActorId MediatorBootstrapper; @@ -589,6 +614,13 @@ Y_UNIT_TEST_SUITE(MediatorTest) { THashMap<TActorId, TActorId> PerQueuePipes; // Queue -> PipeClient }; + class TMediatorTestWithWatcherTwoCoordinators : public TMediatorTestWithWatcher { + public: + TMediatorTestWithWatcherTwoCoordinators() + : TMediatorTestWithWatcher(2) + {} + }; + Y_UNIT_TEST_F(BasicTimecastUpdates, TMediatorTestWithWatcher) { auto& runtime = GetRuntime(); @@ -1201,6 +1233,64 @@ Y_UNIT_TEST_SUITE(MediatorTest) { } } + Y_UNIT_TEST_F(OneCoordinatorResendTxNotLost, TMediatorTestWithWatcherTwoCoordinators) { + auto& runtime = GetRuntime(); + + ui64 tablet1 = AddTargetTablet(); + ui64 tablet2 = AddTargetTablet(); + AddWatchTablet(tablet1); + AddWatchTablet(tablet2); + WaitNoPending(); + WatcherState->Updates.clear(); + + auto queue1 = runtime.AllocateEdgeActor(); + Sync(queue1, TCoordinatorIndex(0), 1); + auto queue2 = runtime.AllocateEdgeActor(); + Sync(queue2, TCoordinatorIndex(1), 1); + + TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime); + + SendStep(queue1, TCoordinatorIndex(0), /* gen */ 1, /* step */ 1010, { + {1, {tablet1, tablet2}}, + }); + SendStep(queue2, TCoordinatorIndex(1), /* gen */ 1, /* step */ 1010, { + {2, {tablet1, tablet2}}, + }); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates( + TGranularUpdate(1010, {{tablet1, 1009}, {tablet2, 1009}}))); + WatcherState->Updates.clear(); + UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u); + blockedPlan.clear(); + + // Simulate one coordinator restarting and resending the step + auto queue3 = runtime.AllocateEdgeActor(); + Sync(queue3, TCoordinatorIndex(1), 2); + SendStep(queue3, TCoordinatorIndex(1), /* gen */ 2, /* step */ 1010, { + {2, {tablet1, tablet2}}, + }); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // Reboot tablets, we expect plans to be resent + RebootTablet(runtime, tablet1, Sender); + RebootTablet(runtime, tablet2, Sender); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates()); + UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u); + + // Tablets must see both transactions + // Note: the bug was causing some transactions to be lost + THashSet<ui64> observedTxIds; + for (auto& ev : blockedPlan) { + auto* msg = ev->Get(); + for (const auto& tx : msg->Record.GetTransactions()) { + observedTxIds.insert(tx.GetTxId()); + } + } + UNIT_ASSERT_VALUES_EQUAL(observedTxIds.size(), 2u); + } + } } // namespace NKikimr::NTxMediator diff --git a/ydb/core/tx/mediator/tablet_queue.cpp b/ydb/core/tx/mediator/tablet_queue.cpp index 2198d23a36..45df30cb0d 100644 --- a/ydb/core/tx/mediator/tablet_queue.cpp +++ b/ydb/core/tx/mediator/tablet_queue.cpp @@ -35,9 +35,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { TStepEntry* const StepRef; // The current list of transactions TVector<TTx> Transactions; - // An updated list of transactions (after last sent to tablet) - // We may need to send acks to the updated AckTo actor - TVector<TTx> OutOfOrder; TStep(TStepEntry* stepRef) : StepRef(stepRef) @@ -60,8 +57,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { return State == EState::Init && Queue.empty() && Watchers.empty(); } - void MergeOutOfOrder(TStep* sx); - void MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update); + void MergeOutOfOrder(TStep* sx, TVector<TTx>&& update); }; struct TStepEntry { @@ -230,19 +226,19 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { TTabletEntry* tabletEntry = Tablets.FindPtr(tabletId); if (tabletEntry == nullptr) { // We don't have a tablet entry, which means no steps inflight - AckOoO(tabletId, step, msg->Transactions, ctx); + AckTransactions(tabletId, step, msg->Transactions, ctx); return; } auto it = tabletEntry->Queue.find(step); if (it == tabletEntry->Queue.end()) { // This step is already confirmed, reply immediately - AckOoO(tabletId, step, msg->Transactions, ctx); + AckTransactions(tabletId, step, msg->Transactions, ctx); return; } // Save possibly updated AckTo for later - tabletEntry->MergeToOutOfOrder(&it->second, std::move(msg->Transactions)); + tabletEntry->MergeOutOfOrder(&it->second, std::move(msg->Transactions)); } void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) { @@ -264,8 +260,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { auto it = tabletEntry->Queue.begin(); while (it != tabletEntry->Queue.end()) { TTabletEntry::TStep* sx = &it->second; - tabletEntry->MergeOutOfOrder(sx); - AckOoO(tabletId, sx->StepRef->Step, sx->Transactions, ctx); + AckTransactions(tabletId, sx->StepRef->Step, sx->Transactions, ctx); // Note: will also auto-remove itself from sx->StepRef->TabletSteps it = tabletEntry->Queue.erase(it); } @@ -288,7 +283,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { tabletEntry->State = TTabletEntry::EState::Connected; for (auto& pr : tabletEntry->Queue) { TTabletEntry::TStep* sx = &pr.second; - tabletEntry->MergeOutOfOrder(sx); SendToTablet(sx, tabletId, ctx); } } @@ -364,10 +358,14 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { } TTabletEntry::TStep* sx = &it->second; - if (!sx->OutOfOrder.empty()) { - // Confirm out-of-order requests - AckOoO(tabletId, step, sx->OutOfOrder, ctx); - } + // Note: in the past we didn't ack all transactions (except out-of-order + // requests when coordinator reconnected), since they are acknowledged + // by tablets. However, events from tablets to coordinator may be lost, + // and those transactions will never be acknowledged until the network + // between coordinator and mediator also flaps. In the future we might + // want to avoid acking transactions by tablets directly, so it doesn't + // introduce unnecessary interconnect chatter to single nodes. + AckTransactions(tabletId, step, sx->Transactions, ctx); // Note: will also auto-remove itself from sx->StepRef->TabletSteps tabletEntry->Queue.erase(it); @@ -402,7 +400,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { TimecastWatches.erase(ev->Sender); } - void AckOoO(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) { + void AckTransactions(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) { TMap<TActorId, std::unique_ptr<TEvTxProcessing::TEvPlanStepAck>> acks; for (const TTx &tx : transactions) { auto& ack = acks[tx.AckTo]; @@ -758,26 +756,6 @@ private: THashMap<TActorId, TGranularServer> GranularServers; }; -/** - * Returns true when all transactions of x are present in superset - */ -static bool IsSubsetOf(const TVector<TTx>& x, const TVector<TTx>& superset) { - auto it = x.begin(); - auto itSuperset = superset.begin(); - while (it != x.end()) { - // Position superset to the lowerbound of the current TxId - while (itSuperset != superset.end() && itSuperset->TxId < it->TxId) { - ++itSuperset; - } - if (itSuperset == superset.end() || it->TxId != itSuperset->TxId) { - return false; - } - ++it; - ++itSuperset; - } - return true; -} - static TString DumpTxIds(const TVector<TTx>& v) { TStringBuilder stream; stream << '{'; @@ -791,31 +769,38 @@ static TString DumpTxIds(const TVector<TTx>& v) { return std::move(stream); } -void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx) { - if (!sx->OutOfOrder.empty()) { - // Since OutOfOrder is an update it might be missing some lost or - // already acknowledged transactions that we don't have to resend. - // We have previously validated that OutOfOrder ⊂ Transactions - sx->Transactions = std::move(sx->OutOfOrder); - } -} - -void TTxMediatorTabletQueue::TTabletEntry::MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update) { - // Update might be missing some lost or already acknowledged transactions - // that we don't have to resend later. We validate that update is a subset - // of a previously received step. - const TVector<TTx>& prev = sx->OutOfOrder.empty() ? sx->Transactions : sx->OutOfOrder; - if (Y_UNLIKELY(!IsSubsetOf(update, prev))) { - // Coordinator shouldn't add new transaction to existing steps, so we - // complain. However, even if that happens, it's ok for us to send - // those transactions later, or never. +void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx, TVector<TTx>&& update) { + // Step transactions are a union from multiple coordinators, and the update + // is currently unacknowledged transactions from a single coordinator. + // The update must be a subset of the full transaction list and cannot + // introduce new transactions out of thin air. + auto dst = sx->Transactions.begin(); + auto src = update.begin(); + bool subset = true; + while (dst != sx->Transactions.end() && src != update.end()) { + if (dst->TxId < src->TxId) { + ++dst; + continue; + } + if (Y_UNLIKELY(dst->TxId != src->TxId)) { + subset = false; + ++src; + continue; + } + dst->AckTo = src->AckTo; + ++dst; + ++src; + } + if (Y_UNLIKELY(!subset) || Y_UNLIKELY(src != update.end())) { + // Coordinators shouldn't add new transactions to existing steps, so we + // complain. Even if that happens, however, it's ok for us to send + // those transactions later, or never. Currently we don't. LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_MEDIATOR_TABLETQUEUE, "Received out-of-order step " << sx->StepRef->Step << " for tablet " << TabletId << " with transactions " << DumpTxIds(update) - << " which are not a subset of previously received " << DumpTxIds(prev)); + << " which are not a subset of previously received " << DumpTxIds(sx->Transactions)); } - sx->OutOfOrder = std::move(update); } } |