diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-23 16:14:07 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-23 16:14:07 +0300 |
commit | fb769e1561a6939b3033079c9f3a634bc0f390eb (patch) | |
tree | 3f8d6ee2b24436819243aace63b3653daaebbfb7 | |
parent | 439ee34959ae6a27dc5ae8cc654804ef0d7eb280 (diff) | |
download | ydb-fb769e1561a6939b3033079c9f3a634bc0f390eb.tar.gz |
Improve leader leases, KIKIMR-15178
ref:6c3a3869d4bd27d7a0eef3c5401d593a3b4b706e
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 148 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.h | 8 |
2 files changed, 97 insertions, 59 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 844333096f8..7e17797173c 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -414,6 +414,8 @@ void TExecutor::Active(const TActorContext &ctx) { LeaseDuration = Owner->ReadOnlyLeaseDuration(); if (!LeaseDuration) { LeaseEnabled = false; + } else { + LeaseDurationUpdated = true; } } @@ -1403,6 +1405,47 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } } +TExecutor::TLeaseCommit* TExecutor::AttachLeaseCommit(TLogCommit* commit, bool force) { + if (!LeaseEnabled || Y_UNLIKELY(LeaseDropped)) { + return nullptr; + } + + if (force || LeaseDurationUpdated) { + NKikimrExecutorFlat::TLeaseInfoMetadata proto; + ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); + proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); + + TString data; + bool ok = proto.SerializeToString(&data); + Y_VERIFY(ok); + + commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); + LeaseDurationUpdated = false; + } + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + TLeaseCommit* lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + + // It may happen in the future that LeaseDuration is decreased by this + // commit, in which case new leader might read and use it, and may not wait + // longer than the new LeaseEnd. If there are commits currently in flight + // make sure to truncate their lease extensions to the new LeaseEnd. + if (force || LeaseDurationUpdated) { + auto it = LeaseCommitsByEnd.upper_bound(lease->LeaseEnd); + while (it != LeaseCommitsByEnd.end()) { + TLeaseCommit* other = it->second; + it = LeaseCommitsByEnd.erase(it); + other->LeaseEnd = lease->LeaseEnd; + other->ByEndIterator = LeaseCommitsByEnd.emplace(other->LeaseEnd, other); + } + // Currently confirmed lease may become truncated as well + LeaseEnd = Min(LeaseEnd, lease->LeaseEnd); + } + + lease->ByEndIterator = LeaseCommitsByEnd.emplace(lease->LeaseEnd, lease); + return lease; +} + TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) { Y_VERIFY(Stats->IsActive && !Stats->IsFollower); Y_VERIFY(at >= LeaseEnd); @@ -1412,40 +1455,22 @@ TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) { LeaseEnabled = true; LeaseDuration = Owner->ReadOnlyLeaseDuration(); Y_VERIFY(LeaseDuration); + LeaseDurationUpdated = true; } - // Try to find a suitable commit that is already in flight TLeaseCommit* lease = nullptr; - for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) { - if (at < it->LeaseEnd) { - lease = &*it; - } else { - break; - } - } - - if (!lease) { - if (LeaseDropped) { - // We cannot start new lease confirmations - return nullptr; - } + // Try to find a suitable commit that is already in flight + // This would be the first commit where at < LeaseEnd + auto itAfter = LeaseCommitsByEnd.upper_bound(at); + if (itAfter != LeaseCommitsByEnd.end()) { + lease = itAfter->second; + } else if (!LeaseDropped) { LogicRedo->FlushBatchedLog(); auto commit = CommitManager->Begin(true, ECommit::Misc); - NKikimrExecutorFlat::TLeaseInfoMetadata proto; - ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); - proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); - - TString data; - bool ok = proto.SerializeToString(&data); - Y_VERIFY(ok); - - commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); - - TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); - lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + lease = AttachLeaseCommit(commit.Get(), /* force */ true); CommitManager->Commit(commit); @@ -2317,8 +2342,10 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv GcLogic->HoldBarrier(barrier->Step); } - if (commitResult.Commit) + if (commitResult.Commit) { + AttachLeaseCommit(commitResult.Commit.Get()); CommitManager->Commit(commitResult.Commit); + } for (auto &affectedTable : change->Affects) CompactionLogic->UpdateInMemStatsStep(affectedTable, 1, Database->GetTableMemSize(affectedTable)); @@ -2482,21 +2509,7 @@ void TExecutor::MakeLogSnapshot() { GcLogic->SnapToLog(snap, commit->Step); LogicSnap->MakeSnap(snap, *commit, Logger.Get()); - if (LeaseEnabled) { - NKikimrExecutorFlat::TLeaseInfoMetadata proto; - ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); - proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); - - TString data; - bool ok = proto.SerializeToString(&data); - Y_VERIFY(ok); - - commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); - - TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); - LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); - } - + AttachLeaseCommit(commit.Get(), /* force */ true); CommitManager->Commit(commit); CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage()); @@ -2720,7 +2733,11 @@ void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &c LeaseEnd = Min(LeaseEnd, ts); for (auto& l : LeaseCommits) { - l.LeaseEnd = Min(l.LeaseEnd, ts); + if (l.LeaseEnd > ts) { + LeaseCommitsByEnd.erase(l.ByEndIterator); + l.LeaseEnd = ts; + l.ByEndIterator = LeaseCommitsByEnd.emplace(l.LeaseEnd, &l); + } } ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped); @@ -2735,6 +2752,15 @@ void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext & return; } + // It is possible lease was extended while this event was pending + TMonotonic now = TActivationContext::Monotonic(); + TMonotonic deadline = LeaseEnd - LeaseDuration / 3; + if (now < deadline) { + Schedule(deadline, new TEvPrivate::TEvLeaseExtend); + LeaseExtendPending = true; + return; + } + if (LeaseUsed) { LeaseUsed = false; UnusedLeaseExtensions = 0; @@ -2781,24 +2807,25 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext LeasePersisted = true; LeaseEnd = Max(LeaseEnd, l.LeaseEnd); - while (!l.Callbacks.empty()) { - // Note: callback may (though unlikely) recursively add more callbacks - TVector<std::function<void()>> callbacks; - callbacks.swap(l.Callbacks); - for (auto& callback : callbacks) { - callback(); - } - } - + auto callbacks = std::move(l.Callbacks); + LeaseCommitsByEnd.erase(l.ByEndIterator); LeaseCommits.pop_front(); - // Calculate a full round-trip latency for leases - // When this latency is larger than third of lease duration we want - // to increase lease duration so we would have enough time for - // processing read-only requests without additional commits - TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); - if ((LeaseEnd - ts) < LeaseDuration / 3) { - LeaseDuration *= 2; + for (auto& callback : callbacks) { + callback(); + } + + if (LeaseDurationIncreases < 2 && LeaseCommits.empty()) { + // Calculate how much of a lease is left after a full round trip + // When we are left with less than a third of lease duration we want + // to increase lease duration so we would have enough time for + // processing read-only requests without additional commits + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + if ((LeaseEnd - ts) < LeaseDuration / 3) { + LeaseDuration *= 2; + LeaseDurationUpdated = true; + ++LeaseDurationIncreases; + } } // We want to schedule a new commit before the lease expires @@ -2918,6 +2945,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable:: TIntrusivePtr<TBarrier> barrier = new TBarrier(commit->Step); + AttachLeaseCommit(commit.Get()); CommitManager->Commit(commit); TAutoPtr<NTable::TSubset> subset; @@ -3351,6 +3379,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) Y_VERIFY(InFlyCompactionGcBarriers.emplace(commit->Step, ops->Barrier).second); + AttachLeaseCommit(commit.Get()); CommitManager->Commit(commit); if (hadFrozen || logicResult.MemCompacted) @@ -4504,6 +4533,7 @@ void TExecutor::CommitCompactionChanges( Y_UNUSED(glob); } + AttachLeaseCommit(commit.Get()); CommitManager->Commit(commit); } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index b9c6753bf03..ab6116c05b0 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -35,6 +35,7 @@ #include <util/system/hp_timer.h> #include <util/thread/singleton.h> +#include <map> #include <optional> #include <variant> @@ -369,16 +370,21 @@ class TExecutor bool LeaseUsed = false; // This flag marks when TEvLeaseExtend message is already pending bool LeaseExtendPending = false; + // This flag is enabled when LeaseDuration is changed and needs to be persisted again + bool LeaseDurationUpdated = false; TDuration LeaseDuration; TMonotonic LeaseEnd; // Counts the number of times an unused lease has been extended size_t UnusedLeaseExtensions = 0; + // Counts the number of times LeaseDuration was increased + size_t LeaseDurationIncreases = 0; struct TLeaseCommit { const ui32 Step; const TMonotonic Start; TMonotonic LeaseEnd; TVector<std::function<void()>> Callbacks; + std::multimap<TMonotonic, TLeaseCommit*>::iterator ByEndIterator; TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd) : Step(step) @@ -388,6 +394,7 @@ class TExecutor }; TList<TLeaseCommit> LeaseCommits; + std::multimap<TMonotonic, TLeaseCommit*> LeaseCommitsByEnd; using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>; THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue; @@ -610,6 +617,7 @@ public: void DetachTablet(const TActorContext &ctx) override; void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override; + TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false); TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); void ConfirmReadOnlyLease(TMonotonic at) override; void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override; |