aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-23 16:14:07 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-23 16:14:07 +0300
commitfb769e1561a6939b3033079c9f3a634bc0f390eb (patch)
tree3f8d6ee2b24436819243aace63b3653daaebbfb7
parent439ee34959ae6a27dc5ae8cc654804ef0d7eb280 (diff)
downloadydb-fb769e1561a6939b3033079c9f3a634bc0f390eb.tar.gz
Improve leader leases, KIKIMR-15178
ref:6c3a3869d4bd27d7a0eef3c5401d593a3b4b706e
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp148
-rw-r--r--ydb/core/tablet_flat/flat_executor.h8
2 files changed, 97 insertions, 59 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 844333096f8..7e17797173c 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -414,6 +414,8 @@ void TExecutor::Active(const TActorContext &ctx) {
LeaseDuration = Owner->ReadOnlyLeaseDuration();
if (!LeaseDuration) {
LeaseEnabled = false;
+ } else {
+ LeaseDurationUpdated = true;
}
}
@@ -1403,6 +1405,47 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
}
+TExecutor::TLeaseCommit* TExecutor::AttachLeaseCommit(TLogCommit* commit, bool force) {
+ if (!LeaseEnabled || Y_UNLIKELY(LeaseDropped)) {
+ return nullptr;
+ }
+
+ if (force || LeaseDurationUpdated) {
+ NKikimrExecutorFlat::TLeaseInfoMetadata proto;
+ ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
+ proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
+
+ TString data;
+ bool ok = proto.SerializeToString(&data);
+ Y_VERIFY(ok);
+
+ commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
+ LeaseDurationUpdated = false;
+ }
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ TLeaseCommit* lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+
+ // It may happen in the future that LeaseDuration is decreased by this
+ // commit, in which case new leader might read and use it, and may not wait
+ // longer than the new LeaseEnd. If there are commits currently in flight
+ // make sure to truncate their lease extensions to the new LeaseEnd.
+ if (force || LeaseDurationUpdated) {
+ auto it = LeaseCommitsByEnd.upper_bound(lease->LeaseEnd);
+ while (it != LeaseCommitsByEnd.end()) {
+ TLeaseCommit* other = it->second;
+ it = LeaseCommitsByEnd.erase(it);
+ other->LeaseEnd = lease->LeaseEnd;
+ other->ByEndIterator = LeaseCommitsByEnd.emplace(other->LeaseEnd, other);
+ }
+ // Currently confirmed lease may become truncated as well
+ LeaseEnd = Min(LeaseEnd, lease->LeaseEnd);
+ }
+
+ lease->ByEndIterator = LeaseCommitsByEnd.emplace(lease->LeaseEnd, lease);
+ return lease;
+}
+
TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) {
Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
Y_VERIFY(at >= LeaseEnd);
@@ -1412,40 +1455,22 @@ TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) {
LeaseEnabled = true;
LeaseDuration = Owner->ReadOnlyLeaseDuration();
Y_VERIFY(LeaseDuration);
+ LeaseDurationUpdated = true;
}
- // Try to find a suitable commit that is already in flight
TLeaseCommit* lease = nullptr;
- for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) {
- if (at < it->LeaseEnd) {
- lease = &*it;
- } else {
- break;
- }
- }
-
- if (!lease) {
- if (LeaseDropped) {
- // We cannot start new lease confirmations
- return nullptr;
- }
+ // Try to find a suitable commit that is already in flight
+ // This would be the first commit where at < LeaseEnd
+ auto itAfter = LeaseCommitsByEnd.upper_bound(at);
+ if (itAfter != LeaseCommitsByEnd.end()) {
+ lease = itAfter->second;
+ } else if (!LeaseDropped) {
LogicRedo->FlushBatchedLog();
auto commit = CommitManager->Begin(true, ECommit::Misc);
- NKikimrExecutorFlat::TLeaseInfoMetadata proto;
- ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
- proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
-
- TString data;
- bool ok = proto.SerializeToString(&data);
- Y_VERIFY(ok);
-
- commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
-
- TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
- lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+ lease = AttachLeaseCommit(commit.Get(), /* force */ true);
CommitManager->Commit(commit);
@@ -2317,8 +2342,10 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
GcLogic->HoldBarrier(barrier->Step);
}
- if (commitResult.Commit)
+ if (commitResult.Commit) {
+ AttachLeaseCommit(commitResult.Commit.Get());
CommitManager->Commit(commitResult.Commit);
+ }
for (auto &affectedTable : change->Affects)
CompactionLogic->UpdateInMemStatsStep(affectedTable, 1, Database->GetTableMemSize(affectedTable));
@@ -2482,21 +2509,7 @@ void TExecutor::MakeLogSnapshot() {
GcLogic->SnapToLog(snap, commit->Step);
LogicSnap->MakeSnap(snap, *commit, Logger.Get());
- if (LeaseEnabled) {
- NKikimrExecutorFlat::TLeaseInfoMetadata proto;
- ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
- proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
-
- TString data;
- bool ok = proto.SerializeToString(&data);
- Y_VERIFY(ok);
-
- commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
-
- TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
- LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
- }
-
+ AttachLeaseCommit(commit.Get(), /* force */ true);
CommitManager->Commit(commit);
CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());
@@ -2720,7 +2733,11 @@ void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &c
LeaseEnd = Min(LeaseEnd, ts);
for (auto& l : LeaseCommits) {
- l.LeaseEnd = Min(l.LeaseEnd, ts);
+ if (l.LeaseEnd > ts) {
+ LeaseCommitsByEnd.erase(l.ByEndIterator);
+ l.LeaseEnd = ts;
+ l.ByEndIterator = LeaseCommitsByEnd.emplace(l.LeaseEnd, &l);
+ }
}
ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped);
@@ -2735,6 +2752,15 @@ void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &
return;
}
+ // It is possible lease was extended while this event was pending
+ TMonotonic now = TActivationContext::Monotonic();
+ TMonotonic deadline = LeaseEnd - LeaseDuration / 3;
+ if (now < deadline) {
+ Schedule(deadline, new TEvPrivate::TEvLeaseExtend);
+ LeaseExtendPending = true;
+ return;
+ }
+
if (LeaseUsed) {
LeaseUsed = false;
UnusedLeaseExtensions = 0;
@@ -2781,24 +2807,25 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
LeasePersisted = true;
LeaseEnd = Max(LeaseEnd, l.LeaseEnd);
- while (!l.Callbacks.empty()) {
- // Note: callback may (though unlikely) recursively add more callbacks
- TVector<std::function<void()>> callbacks;
- callbacks.swap(l.Callbacks);
- for (auto& callback : callbacks) {
- callback();
- }
- }
-
+ auto callbacks = std::move(l.Callbacks);
+ LeaseCommitsByEnd.erase(l.ByEndIterator);
LeaseCommits.pop_front();
- // Calculate a full round-trip latency for leases
- // When this latency is larger than third of lease duration we want
- // to increase lease duration so we would have enough time for
- // processing read-only requests without additional commits
- TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
- if ((LeaseEnd - ts) < LeaseDuration / 3) {
- LeaseDuration *= 2;
+ for (auto& callback : callbacks) {
+ callback();
+ }
+
+ if (LeaseDurationIncreases < 2 && LeaseCommits.empty()) {
+ // Calculate how much of a lease is left after a full round trip
+ // When we are left with less than a third of lease duration we want
+ // to increase lease duration so we would have enough time for
+ // processing read-only requests without additional commits
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ if ((LeaseEnd - ts) < LeaseDuration / 3) {
+ LeaseDuration *= 2;
+ LeaseDurationUpdated = true;
+ ++LeaseDurationIncreases;
+ }
}
// We want to schedule a new commit before the lease expires
@@ -2918,6 +2945,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
TIntrusivePtr<TBarrier> barrier = new TBarrier(commit->Step);
+ AttachLeaseCommit(commit.Get());
CommitManager->Commit(commit);
TAutoPtr<NTable::TSubset> subset;
@@ -3351,6 +3379,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
Y_VERIFY(InFlyCompactionGcBarriers.emplace(commit->Step, ops->Barrier).second);
+ AttachLeaseCommit(commit.Get());
CommitManager->Commit(commit);
if (hadFrozen || logicResult.MemCompacted)
@@ -4504,6 +4533,7 @@ void TExecutor::CommitCompactionChanges(
Y_UNUSED(glob);
}
+ AttachLeaseCommit(commit.Get());
CommitManager->Commit(commit);
}
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index b9c6753bf03..ab6116c05b0 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -35,6 +35,7 @@
#include <util/system/hp_timer.h>
#include <util/thread/singleton.h>
+#include <map>
#include <optional>
#include <variant>
@@ -369,16 +370,21 @@ class TExecutor
bool LeaseUsed = false;
// This flag marks when TEvLeaseExtend message is already pending
bool LeaseExtendPending = false;
+ // This flag is enabled when LeaseDuration is changed and needs to be persisted again
+ bool LeaseDurationUpdated = false;
TDuration LeaseDuration;
TMonotonic LeaseEnd;
// Counts the number of times an unused lease has been extended
size_t UnusedLeaseExtensions = 0;
+ // Counts the number of times LeaseDuration was increased
+ size_t LeaseDurationIncreases = 0;
struct TLeaseCommit {
const ui32 Step;
const TMonotonic Start;
TMonotonic LeaseEnd;
TVector<std::function<void()>> Callbacks;
+ std::multimap<TMonotonic, TLeaseCommit*>::iterator ByEndIterator;
TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd)
: Step(step)
@@ -388,6 +394,7 @@ class TExecutor
};
TList<TLeaseCommit> LeaseCommits;
+ std::multimap<TMonotonic, TLeaseCommit*> LeaseCommitsByEnd;
using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>;
THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue;
@@ -610,6 +617,7 @@ public:
void DetachTablet(const TActorContext &ctx) override;
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
+ TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false);
TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
void ConfirmReadOnlyLease(TMonotonic at) override;
void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override;