aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/base/tablet.h91
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator.cpp57
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator.h3
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator_ut.cpp3
-rw-r--r--ydb/core/cms/json_proxy_proto.h2
-rw-r--r--ydb/core/control/immediate_control_board_control.cpp7
-rw-r--r--ydb/core/control/immediate_control_board_control.h1
-rw-r--r--ydb/core/control/immediate_control_board_impl.cpp10
-rw-r--r--ydb/core/control/immediate_control_board_impl.h2
-rw-r--r--ydb/core/control/immediate_control_board_wrapper.h9
-rw-r--r--ydb/core/protos/config.proto14
-rw-r--r--ydb/core/protos/counters_coordinator.proto2
-rw-r--r--ydb/core/protos/tablet.proto14
-rw-r--r--ydb/core/tablet/tablet_req_rebuildhistory.cpp17
-rw-r--r--ydb/core/tablet/tablet_sys.cpp10
-rw-r--r--ydb/core/tablet_flat/CMakeLists.txt1
-rw-r--r--ydb/core/tablet_flat/flat_boot_cookie.h4
-rw-r--r--ydb/core/tablet_flat/flat_boot_lease.cpp110
-rw-r--r--ydb/core/tablet_flat/flat_exec_commit.h2
-rw-r--r--ydb/core/tablet_flat/flat_exec_commit_mgr.h1
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp180
-rw-r--r--ydb/core/tablet_flat/flat_executor.h40
-rw-r--r--ydb/core/tablet_flat/flat_executor.proto11
-rw-r--r--ydb/core/tablet_flat/flat_executor_bootlogic.cpp25
-rw-r--r--ydb/core/tablet_flat/flat_executor_bootlogic.h4
-rw-r--r--ydb/core/tablet_flat/flat_executor_leases_ut.cpp365
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.cpp12
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h8
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux.txt2
-rw-r--r--ydb/core/testlib/test_client.cpp6
-rw-r--r--ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp16
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.cpp23
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.h14
34 files changed, 1000 insertions, 68 deletions
diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h
index 602e39c600..3b97f4fe09 100644
--- a/ydb/core/base/tablet.h
+++ b/ydb/core/base/tablet.h
@@ -52,6 +52,7 @@ struct TEvTablet {
EvFollowerSyncComplete, // from leader to user tablet when all old followers are touched and synced
EvCutTabletHistory,
EvUpdateConfig,
+ EvDropLease,
EvCommit = EvBoot + 512,
EvAux,
@@ -60,6 +61,7 @@ struct TEvTablet {
EvTabletActive,
EvPromoteToLeader,
EvFGcAck, // from user tablet to follower
+ EvLeaseDropped,
EvTabletDead = EvBoot + 1024,
EvFollowerUpdateState, // notifications to guardian
@@ -101,6 +103,16 @@ struct TEvTablet {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET)");
+ struct TCommitMetadata {
+ ui32 Key;
+ TString Data;
+
+ TCommitMetadata(ui32 key, TString data)
+ : Key(key)
+ , Data(std::move(data))
+ { }
+ };
+
struct TDependencyGraph : public TThrRefBase {
struct TEntry {
std::pair<ui32, ui32> Id;
@@ -111,28 +123,29 @@ struct TEvTablet {
TVector<TLogoBlobID> GcLeft;
TString EmbeddedLogBody;
-
- TEntry()
- : IsSnapshot(false)
- {}
-
- void Set(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &refs, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Id = id;
- References.swap(refs);
- IsSnapshot = isSnapshot;
- GcDiscovered.swap(gcDiscovered);
- GcLeft.swap(gcLeft);
- EmbeddedLogBody.clear();
- }
-
- void Set(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Id = id;
- References.clear();
- IsSnapshot = false;
- GcDiscovered.swap(gcDiscovered);
- GcLeft.swap(gcLeft);
- EmbeddedLogBody = embeddedLogBody;
- }
+ TVector<TCommitMetadata> EmbeddedMetadata;
+
+ TEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ : Id(id)
+ , IsSnapshot(isSnapshot)
+ , References(std::move(refs))
+ , GcDiscovered(std::move(gcDiscovered))
+ , GcLeft(std::move(gcLeft))
+ , EmbeddedMetadata(std::move(metadata))
+ { }
+
+ TEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ : Id(id)
+ , IsSnapshot(false)
+ , GcDiscovered(std::move(gcDiscovered))
+ , GcLeft(std::move(gcLeft))
+ , EmbeddedLogBody(std::move(embeddedLogBody))
+ , EmbeddedMetadata(std::move(metadata))
+ { }
};
std::pair<ui32, ui32> Snapshot;
@@ -148,19 +161,23 @@ struct TEvTablet {
<< " entries " << Entries.size() << "}";
}
- void AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &references, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
+ TEntry& AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ {
if (isSnapshot) {
Snapshot = id;
Entries.clear();
}
- Entries.push_back(TEntry());
- Entries.back().Set(id, references, isSnapshot, gcDiscovered, gcLeft);
+ return Entries.emplace_back(id, std::move(refs), isSnapshot, std::move(gcDiscovered), std::move(gcLeft), std::move(metadata));
}
- void AddEntry(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Entries.push_back(TEntry());
- Entries.back().Set(id, embeddedLogBody, gcDiscovered, gcLeft);
+ TEntry& AddEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ {
+ return Entries.emplace_back(id, std::move(embeddedLogBody), std::move(gcDiscovered), std::move(gcLeft), std::move(metadata));
}
void Invalidate() {
@@ -269,6 +286,8 @@ struct TEvTablet {
TString EmbeddedLogBody;
TString FollowerAux;
+ TVector<TCommitMetadata> EmbeddedMetadata;
+
TEvCommit(ui64 tabletId, ui32 gen, ui32 step, const TVector<ui32> &dependsOn, bool isSnapshot
, bool preCommited = false
, TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticMinLatency)
@@ -777,6 +796,22 @@ struct TEvTablet {
};
struct TEvTabletStopped : TEventLocal<TEvTabletStopped, EvTabletStopped> {};
+
+ struct TEvDropLease : TEventPB<TEvDropLease, NKikimrTabletBase::TEvDropLease, EvDropLease> {
+ TEvDropLease() = default;
+
+ explicit TEvDropLease(ui64 tabletId) {
+ Record.SetTabletID(tabletId);
+ }
+ };
+
+ struct TEvLeaseDropped : TEventPB<TEvLeaseDropped, NKikimrTabletBase::TEvLeaseDropped, EvLeaseDropped> {
+ TEvLeaseDropped() = default;
+
+ explicit TEvLeaseDropped(ui64 tabletId) {
+ Record.SetTabletID(tabletId);
+ }
+ };
};
IActor* CreateTabletKiller(ui64 tabletId, ui32 nodeId = 0, ui32 maxGeneration = Max<ui32>());
diff --git a/ydb/core/cms/console/immediate_controls_configurator.cpp b/ydb/core/cms/console/immediate_controls_configurator.cpp
index af3ad24217..4f959a1891 100644
--- a/ydb/core/cms/console/immediate_controls_configurator.cpp
+++ b/ydb/core/cms/console/immediate_controls_configurator.cpp
@@ -18,7 +18,8 @@ public:
}
TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg);
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls);
void Bootstrap(const TActorContext &ctx);
@@ -38,13 +39,15 @@ public:
}
private:
- void CreateControls(TIntrusivePtr<TControlBoard> board);
+ void CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting);
void CreateControls(TIntrusivePtr<TControlBoard> board,
const google::protobuf::Descriptor *desc,
- const TString &prefix);
+ const TString &prefix,
+ bool allowExisting);
void AddControl(TIntrusivePtr<TControlBoard> board,
const google::protobuf::FieldDescriptor *desc,
- const TString &prefix);
+ const TString &prefix,
+ bool allowExisting);
void ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg,
TIntrusivePtr<TControlBoard> board);
void ApplyConfig(const ::google::protobuf::Message &cfg,
@@ -57,9 +60,10 @@ private:
};
TImmediateControlsConfigurator::TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg)
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls)
{
- CreateControls(board);
+ CreateControls(board, allowExistingControls);
ApplyConfig(cfg, board);
}
@@ -98,15 +102,16 @@ void TImmediateControlsConfigurator::Handle(TEvConsole::TEvConfigNotificationReq
ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie);
}
-void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board)
+void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting)
{
auto *desc = NKikimrConfig::TImmediateControlsConfig::descriptor();
- CreateControls(board, desc, "");
+ CreateControls(board, desc, "", allowExisting);
}
void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board,
const google::protobuf::Descriptor *desc,
- const TString &prefix)
+ const TString &prefix,
+ bool allowExisting)
{
for (int i = 0; i < desc->field_count(); ++i) {
auto *fieldDesc = desc->field(i);
@@ -117,20 +122,22 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard>
auto fieldType = fieldDesc->type();
if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64
|| fieldType == google::protobuf::FieldDescriptor::TYPE_INT64)
- AddControl(board, fieldDesc, prefix);
+ AddControl(board, fieldDesc, prefix, allowExisting);
else {
Y_VERIFY(fieldType == google::protobuf::FieldDescriptor::TYPE_MESSAGE,
"Only [u]int64 and message fields are allowed in Immediate Controls Config");
CreateControls(board, fieldDesc->message_type(),
- MakePrefix(prefix, fieldDesc->name()));
+ MakePrefix(prefix, fieldDesc->name()),
+ allowExisting);
}
}
}
void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> board,
const google::protobuf::FieldDescriptor *desc,
- const TString &prefix)
+ const TString &prefix,
+ bool allowExisting)
{
auto &opts = desc->options().GetExtension(NKikimrConfig::ControlOptions);
auto name = MakePrefix(prefix, desc->name());
@@ -139,11 +146,22 @@ void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> boa
ui64 maxValue = opts.GetMaxValue();
Controls[name] = TControlWrapper(defaultValue, minValue, maxValue);
- // All controls are actually shared but use RegisterLocalControl to
- // make sure it is not registered by someone else with other options.
- auto res = board->RegisterLocalControl(Controls[name], name);
- Y_VERIFY_S(res, "Immediate Control " << name << " was registered before "
- << "TImmediateControlsConfigurator creation");
+
+ // When we register control it is possible that is has already been
+ // registered by some other code, in which case it may be used before it
+ // is properly configured. It can currently only happen in configurator
+ // tests, where it is created very late after some tablets have already
+ // started.
+ auto res = board->RegisterSharedControl(Controls[name], name);
+ Y_VERIFY_S(res || allowExisting,
+ "Immediate Control " << name << " was registered before "
+ << "TImmediateControlsConfigurator creation");
+ if (Y_UNLIKELY(!res)) {
+ Cerr << "WARNING: immediate control " << name << " was registered before "
+ << "TImmediateControlsConfigurator creation. "
+ << "A default value may have been used before it was configured." << Endl;
+ Controls[name].Reset(defaultValue, minValue, maxValue);
+ }
}
void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg,
@@ -192,9 +210,10 @@ TString TImmediateControlsConfigurator::MakePrefix(const TString &prefix,
}
IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg)
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls)
{
- return new TImmediateControlsConfigurator(board, cfg);
+ return new TImmediateControlsConfigurator(board, cfg, allowExistingControls);
}
} // namespace NConsole
diff --git a/ydb/core/cms/console/immediate_controls_configurator.h b/ydb/core/cms/console/immediate_controls_configurator.h
index c6c4ba029d..53f1b3e5d7 100644
--- a/ydb/core/cms/console/immediate_controls_configurator.h
+++ b/ydb/core/cms/console/immediate_controls_configurator.h
@@ -13,7 +13,8 @@ namespace NConsole {
* immediate control board via CMS.
*/
IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg);
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls = false);
} // namespace NConsole
} // namespace NKikimr
diff --git a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
index 8ad8f6ed1b..2d2a9f8251 100644
--- a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
+++ b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
@@ -57,7 +57,8 @@ NKikimrConsole::TConfigItem ITEM_CONTROLS_EXCEED_MAX;
void InitImmediateControlsConfigurator(TTenantTestRuntime &runtime)
{
runtime.Register(CreateImmediateControlsConfigurator(runtime.GetAppData().Icb,
- NKikimrConfig::TImmediateControlsConfig()));
+ NKikimrConfig::TImmediateControlsConfig(),
+ /* allowExistingControls */ true));
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvConfigsDispatcher::EvSetConfigSubscriptionResponse, 1);
runtime.DispatchEvents(options);
diff --git a/ydb/core/cms/json_proxy_proto.h b/ydb/core/cms/json_proxy_proto.h
index b1b87f30cd..9c66617066 100644
--- a/ydb/core/cms/json_proxy_proto.h
+++ b/ydb/core/cms/json_proxy_proto.h
@@ -73,6 +73,8 @@ protected:
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TDataShardControls::TExecutionProfileOptions::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTxLimitControls")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTxLimitControls::descriptor(), ctx);
+ else if (name == ".NKikimrConfig.TImmediateControlsConfig.TCoordinatorControls")
+ return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TCoordinatorControls::descriptor(), ctx);
}
ctx.Send(RequestEvent->Sender,
diff --git a/ydb/core/control/immediate_control_board_control.cpp b/ydb/core/control/immediate_control_board_control.cpp
index 7762cf9d88..6acb68e985 100644
--- a/ydb/core/control/immediate_control_board_control.cpp
+++ b/ydb/core/control/immediate_control_board_control.cpp
@@ -15,6 +15,13 @@ void TControl::Set(TAtomicBase newValue) {
AtomicSet(Default, newValue);
}
+void TControl::Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) {
+ Value = defaultValue;
+ Default = defaultValue;
+ LowerBound = lowerBound;
+ UpperBound = upperBound;
+}
+
TAtomicBase TControl::SetFromHtmlRequest(TAtomicBase newValue) {
TAtomicBase prevValue = AtomicGet(Value);
if (newValue == AtomicGet(Default)) {
diff --git a/ydb/core/control/immediate_control_board_control.h b/ydb/core/control/immediate_control_board_control.h
index 7fd038b03a..381b9cb0fc 100644
--- a/ydb/core/control/immediate_control_board_control.h
+++ b/ydb/core/control/immediate_control_board_control.h
@@ -16,6 +16,7 @@ public:
TControl(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound);
void Set(TAtomicBase newValue);
+ void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound);
TAtomicBase SetFromHtmlRequest(TAtomicBase newValue);
diff --git a/ydb/core/control/immediate_control_board_impl.cpp b/ydb/core/control/immediate_control_board_impl.cpp
index fa26926fae..06f6dab1a8 100644
--- a/ydb/core/control/immediate_control_board_impl.cpp
+++ b/ydb/core/control/immediate_control_board_impl.cpp
@@ -15,8 +15,14 @@ bool TControlBoard::RegisterLocalControl(TControlWrapper control, TString name)
return result;
}
-void TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) {
- control.Control = Board.InsertIfAbsent(name, control.Control);
+bool TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) {
+ auto& ptr = Board.InsertIfAbsent(name, control.Control);
+ if (control.Control == ptr) {
+ return true;
+ } else {
+ control.Control = ptr;
+ return false;
+ }
}
void TControlBoard::RestoreDefaults() {
diff --git a/ydb/core/control/immediate_control_board_impl.h b/ydb/core/control/immediate_control_board_impl.h
index a01e09f4d6..f5463aa12d 100644
--- a/ydb/core/control/immediate_control_board_impl.h
+++ b/ydb/core/control/immediate_control_board_impl.h
@@ -14,7 +14,7 @@ private:
public:
bool RegisterLocalControl(TControlWrapper control, TString name);
- void RegisterSharedControl(TControlWrapper& control, TString name);
+ bool RegisterSharedControl(TControlWrapper& control, TString name);
void RestoreDefaults();
diff --git a/ydb/core/control/immediate_control_board_wrapper.h b/ydb/core/control/immediate_control_board_wrapper.h
index ce8a6adde5..e2da0d80d0 100644
--- a/ydb/core/control/immediate_control_board_wrapper.h
+++ b/ydb/core/control/immediate_control_board_wrapper.h
@@ -29,6 +29,15 @@ public:
bool IsTheSame(TControlWrapper another) {
return Control == another.Control;
}
+
+ /**
+ * Resets an existing control to different values.
+ *
+ * WARNING: this method is not thread safe and may only be used during initialization.
+ */
+ void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) {
+ Control->Reset(defaultValue, lowerBound, upperBound);
+ }
};
class TMemorizableControlWrapper {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ebc7d0eaa3..e386092b17 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1216,8 +1216,22 @@ message TImmediateControlsConfig {
DefaultValue: 60000 }];
}
+ message TCoordinatorControls {
+ optional uint64 EnableLeaderLeases = 1 [(ControlOptions) = {
+ Description: "Enables leader leases for processing read-only queries",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 MinLeaderLeaseDurationUs = 2 [(ControlOptions) = {
+ Description: "The minimum leader lease duration in microseconds",
+ MinValue: 1000,
+ MaxValue: 5000000,
+ DefaultValue: 250000 }];
+ }
+
optional TDataShardControls DataShardControls = 1;
optional TTxLimitControls TxLimitControls = 2;
+ optional TCoordinatorControls CoordinatorControls = 3;
};
message TMeteringConfig {
diff --git a/ydb/core/protos/counters_coordinator.proto b/ydb/core/protos/counters_coordinator.proto
index 28433e3e1a..f5437245bb 100644
--- a/ydb/core/protos/counters_coordinator.proto
+++ b/ydb/core/protos/counters_coordinator.proto
@@ -12,7 +12,7 @@ enum ESimpleCounters {
}
enum ECumulativeCounters {
- COUNTER_CUMULATIVE_IGNORE = 0;
+ COUNTER_REQ_ACQUIRE_READ_STEP = 0 [(CounterOpts) = {Name: "AcquireReadStepRequests"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto
index f0bec238e7..9e51d432da 100644
--- a/ydb/core/protos/tablet.proto
+++ b/ydb/core/protos/tablet.proto
@@ -60,6 +60,11 @@ message TTabletTypes {
}
}
+message TTabletLogMetadata {
+ optional uint32 Key = 1;
+ optional bytes Data = 2;
+}
+
message TTabletLogEntry {
// normal log entries
repeated uint32 DependsOn = 1;
@@ -79,6 +84,7 @@ message TTabletLogEntry {
repeated fixed64 ZeroTailBitmask = 12;
optional bytes EmbeddedLogBody = 13;
+ repeated TTabletLogMetadata EmbeddedMetadata = 14;
}
message TTabletChannelInfo {
@@ -238,3 +244,11 @@ message TEvTabletStop {
optional uint64 TabletID = 1;
optional EReason Reason = 2;
}
+
+message TEvDropLease {
+ optional uint64 TabletID = 1;
+}
+
+message TEvLeaseDropped {
+ optional uint64 TabletID = 1;
+}
diff --git a/ydb/core/tablet/tablet_req_rebuildhistory.cpp b/ydb/core/tablet/tablet_req_rebuildhistory.cpp
index c0b4c0c2bc..620b25c1a8 100644
--- a/ydb/core/tablet/tablet_req_rebuildhistory.cpp
+++ b/ydb/core/tablet/tablet_req_rebuildhistory.cpp
@@ -44,6 +44,7 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
bool IsSnapshot;
bool IsTotalSnapshot;
TString EmbeddedLogBody;
+ TVector<TEvTablet::TCommitMetadata> EmbeddedMetadata;
TVector<TLogoBlobID> GcDiscovered;
TVector<TLogoBlobID> GcLeft;
@@ -125,6 +126,14 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
GcLeft[i] = LogoBlobIDFromLogoBlobID(x.GetGcLeft(i));
}
+ if (const size_t metaSize = x.EmbeddedMetadataSize()) {
+ EmbeddedMetadata.reserve(metaSize);
+ for (size_t i = 0; i < metaSize; ++i) {
+ const auto& meta = x.GetEmbeddedMetadata(i);
+ EmbeddedMetadata.emplace_back(meta.GetKey(), meta.GetData());
+ }
+ }
+
switch (Status) {
case StatusUnknown:
Status = StatusBody;
@@ -679,9 +688,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
}());
if (entry.EmbeddedLogBody)
- graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
else
- graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
if (lastUnbrokenTailEntry + 1 == id.second)
lastUnbrokenTailEntry = id.second;
@@ -717,9 +726,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
}());
if (entry.EmbeddedLogBody)
- graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
else
- graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
hasSnapshotInGeneration |= entry.IsSnapshot;
break;
diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp
index 3f3081f128..4537a4af91 100644
--- a/ydb/core/tablet/tablet_sys.cpp
+++ b/ydb/core/tablet/tablet_sys.cpp
@@ -1164,6 +1164,16 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) {
entry->FollowerUpdate->Body = msg->EmbeddedLogBody;
}
+ if (!msg->EmbeddedMetadata.empty()) {
+ auto *m = x->MutableEmbeddedMetadata();
+ m->Reserve(msg->EmbeddedMetadata.size());
+ for (const auto &meta : msg->EmbeddedMetadata) {
+ auto *p = m->Add();
+ p->SetKey(meta.Key);
+ p->SetData(meta.Data);
+ }
+ }
+
if (saveFollowerUpdate && msg->FollowerAux)
entry->FollowerUpdate->AuxPayload = msg->FollowerAux;
diff --git a/ydb/core/tablet_flat/CMakeLists.txt b/ydb/core/tablet_flat/CMakeLists.txt
index 93bdf75605..b5b4b43084 100644
--- a/ydb/core/tablet_flat/CMakeLists.txt
+++ b/ydb/core/tablet_flat/CMakeLists.txt
@@ -35,6 +35,7 @@ target_proto_messages(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor.proto
)
target_sources(ydb-core-tablet_flat PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_boot_lease.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_boot_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp
diff --git a/ydb/core/tablet_flat/flat_boot_cookie.h b/ydb/core/tablet_flat/flat_boot_cookie.h
index 58e810325a..4ff737c2bd 100644
--- a/ydb/core/tablet_flat/flat_boot_cookie.h
+++ b/ydb/core/tablet_flat/flat_boot_cookie.h
@@ -68,6 +68,10 @@ namespace NBoot {
const ui32 Raw = 0; /* only lower 24 bits are used */
};
+ enum class ELogCommitMeta : ui32 {
+ LeaseInfo = 1,
+ };
+
}
}
}
diff --git a/ydb/core/tablet_flat/flat_boot_lease.cpp b/ydb/core/tablet_flat/flat_boot_lease.cpp
new file mode 100644
index 0000000000..0fe6966b49
--- /dev/null
+++ b/ydb/core/tablet_flat/flat_boot_lease.cpp
@@ -0,0 +1,110 @@
+#include "flat_executor_bootlogic.h"
+#include <ydb/core/util/pb.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+
+namespace NKikimr {
+namespace NTabletFlatExecutor {
+
+class TLeaseWaiter : public TActorBootstrapped<TLeaseWaiter> {
+public:
+ TLeaseWaiter(
+ const TActorId& owner,
+ TMonotonic bootTimestamp,
+ const TActorId& leaseHolder,
+ TDuration leaseDuration)
+ : Owner(owner)
+ , BootTimestamp(bootTimestamp)
+ , LeaseHolder(leaseHolder)
+ , LeaseDuration(leaseDuration)
+ { }
+
+ void Bootstrap() {
+ SendRequest();
+ Schedule(BootTimestamp + LeaseDuration * 2, new TEvents::TEvWakeup);
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvents::TEvPoison, PassAway);
+ sFunc(TEvents::TEvWakeup, HandleTimeout);
+ hFunc(TEvTablet::TEvLeaseDropped, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ }
+ }
+
+ void SendRequest() {
+ Send(LeaseHolder, new TEvTablet::TEvDropLease(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
+ }
+
+ void Finish() {
+ Send(Owner, new TEvTablet::TEvLeaseDropped());
+ PassAway();
+ }
+
+ void HandleTimeout() {
+ Finish();
+ }
+
+ void Handle(TEvTablet::TEvLeaseDropped::TPtr&) {
+ Finish();
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr&) {
+ // We don't need to do anything
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ // Keep retrying until timeout is reached
+ SendRequest();
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ auto* msg = ev->Get();
+ Y_VERIFY(ev->Sender == LeaseHolder);
+ Y_VERIFY(msg->SourceType == TEvTablet::TEvDropLease::EventType);
+ if (msg->Reason == TEvents::TEvUndelivered::ReasonActorUnknown) {
+ // We have proved lease holder no longer exists
+ return Finish();
+ }
+
+ // We may get undelivered notifications due to disconnections
+ // Since we cannot trust them we expect to retry on TEvNodeDisconnected
+ }
+
+private:
+ const TActorId Owner;
+ const TMonotonic BootTimestamp;
+ const TActorId LeaseHolder;
+ const TDuration LeaseDuration;
+};
+
+void TExecutorBootLogic::StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept
+{
+ TActorId leaseHolder;
+ TDuration leaseDuration;
+
+ for (const auto& entry : graph.Entries) {
+ for (const auto& meta : entry.EmbeddedMetadata) {
+ if (NBoot::ELogCommitMeta(meta.Key) == NBoot::ELogCommitMeta::LeaseInfo) {
+ TProtoBox<NKikimrExecutorFlat::TLeaseInfoMetadata> proto(meta.Data);
+ if (proto.HasLeaseHolder()) {
+ leaseHolder = ActorIdFromProto(proto.GetLeaseHolder());
+ }
+ if (proto.HasLeaseDurationUs()) {
+ leaseDuration = TDuration::MicroSeconds(proto.GetLeaseDurationUs());
+ }
+ }
+ }
+ }
+
+ if (leaseHolder && leaseDuration) {
+ LeaseWaiter = Ops->RegisterWithSameMailbox(new TLeaseWaiter(SelfId, bootTimestamp, leaseHolder, leaseDuration));
+ }
+}
+
+} // namespace NTabletFlatExecutor
+} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/flat_exec_commit.h b/ydb/core/tablet_flat/flat_exec_commit.h
index 7212449c25..a1402dcf24 100644
--- a/ydb/core/tablet_flat/flat_exec_commit.h
+++ b/ydb/core/tablet_flat/flat_exec_commit.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/base/logoblob.h>
+#include <ydb/core/base/tablet.h>
#include <util/generic/vector.h>
#include <util/generic/string.h>
@@ -53,6 +54,7 @@ namespace NTabletFlatExecutor {
TString FollowerAux;
TVector<TLogoBlob> Refs;
TGCBlobDelta GcDelta;
+ TVector<TEvTablet::TCommitMetadata> Metadata;
TSeat *FirstTx = nullptr;
TSeat *LastTx = nullptr;
};
diff --git a/ydb/core/tablet_flat/flat_exec_commit_mgr.h b/ydb/core/tablet_flat/flat_exec_commit_mgr.h
index 20f5d5ffe1..caeabb014f 100644
--- a/ydb/core/tablet_flat/flat_exec_commit_mgr.h
+++ b/ydb/core/tablet_flat/flat_exec_commit_mgr.h
@@ -175,6 +175,7 @@ namespace NTabletFlatExecutor {
ev->FollowerAux = std::move(commit.FollowerAux);
ev->GcDiscovered = std::move(commit.GcDelta.Created);
ev->GcLeft = std::move(commit.GcDelta.Deleted);
+ ev->EmbeddedMetadata = std::move(commit.Metadata);
Ops->Send(Owner, ev, 0, ui64(commit.Type));
}
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index e38dda8373..7f8c5550ad 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -408,6 +408,15 @@ void TExecutor::Active(const TActorContext &ctx) {
CompactionLogic->UpdateInMemStatsStep(it.first, 0, Database->GetTableMemSize(it.first));
UpdateCompactions();
+
+ LeaseEnabled = Owner->ReadOnlyLeaseEnabled();
+ if (LeaseEnabled) {
+ LeaseDuration = Owner->ReadOnlyLeaseDuration();
+ if (!LeaseDuration) {
+ LeaseEnabled = false;
+ }
+ }
+
MakeLogSnapshot();
if (auto error = CheckBorrowConsistency()) {
@@ -432,8 +441,6 @@ void TExecutor::TranscriptBootOpResult(ui32 res, const TActorContext &ctx) {
case TExecutorBootLogic::OpResultComplete:
return Active(ctx);
case TExecutorBootLogic::OpResultBroken:
- BootLogic.Destroy();
-
if (auto logl = Logger->Log(ELnLev::Error)) {
logl << NFmt::Do(*this) << " Broken while booting";
}
@@ -453,10 +460,10 @@ void TExecutor::TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ct
case TExecutorBootLogic::OpResultComplete:
return ActivateFollower(ctx);
case TExecutorBootLogic::OpResultBroken:
- BootLogic.Destroy();
if (auto logl = Logger->Log(ELnLev::Error)) {
logl << NFmt::Do(*this) << " Broken while follower booting";
}
+
return Broken();
default:
Y_FAIL("unknown boot result");
@@ -588,6 +595,7 @@ TExecutorCaches TExecutor::CleanupState() {
TExecutorCaches caches;
if (BootLogic) {
+ BootLogic->Cancel();
caches = BootLogic->DetachCaches();
} else {
if (PrivatePageCache) {
@@ -1395,6 +1403,85 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
}
+TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ Y_VERIFY(at >= LeaseEnd);
+
+ if (!LeaseEnabled) {
+ // Automatically enable leases
+ LeaseEnabled = true;
+ LeaseDuration = Owner->ReadOnlyLeaseDuration();
+ Y_VERIFY(LeaseDuration);
+ }
+
+ // Try to find a suitable commit that is already in flight
+ TLeaseCommit* lease = nullptr;
+ for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) {
+ if (at < it->LeaseEnd) {
+ lease = &*it;
+ } else {
+ break;
+ }
+ }
+
+ if (!lease) {
+ if (LeaseDropped) {
+ // We cannot start new lease confirmations
+ return nullptr;
+ }
+
+ LogicRedo->FlushBatchedLog();
+
+ auto commit = CommitManager->Begin(true, ECommit::Misc);
+
+ NKikimrExecutorFlat::TLeaseInfoMetadata proto;
+ ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
+ proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
+
+ TString data;
+ bool ok = proto.SerializeToString(&data);
+ Y_VERIFY(ok);
+
+ commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+
+ CommitManager->Commit(commit);
+ }
+
+ return lease;
+}
+
+void TExecutor::ConfirmReadOnlyLease(TMonotonic at) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ LeaseUsed = true;
+
+ if (LeaseEnabled && at < LeaseEnd) {
+ return;
+ }
+
+ EnsureReadOnlyLease(at);
+}
+
+void TExecutor::ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ LeaseUsed = true;
+
+ if (LeaseEnabled && at < LeaseEnd) {
+ callback();
+ return;
+ }
+
+ if (auto* lease = EnsureReadOnlyLease(at)) {
+ lease->Callbacks.push_back(std::move(callback));
+ }
+}
+
+void TExecutor::ConfirmReadOnlyLease(std::function<void()> callback) {
+ ConfirmReadOnlyLease(AppData()->MonotonicTimeProvider->Now(), std::move(callback));
+}
+
bool TExecutor::CanExecuteTransaction() const {
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
}
@@ -2389,6 +2476,21 @@ void TExecutor::MakeLogSnapshot() {
GcLogic->SnapToLog(snap, commit->Step);
LogicSnap->MakeSnap(snap, *commit, Logger.Get());
+ if (LeaseEnabled) {
+ NKikimrExecutorFlat::TLeaseInfoMetadata proto;
+ ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
+ proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
+
+ TString data;
+ bool ok = proto.SerializeToString(&data);
+ Y_VERIFY(ok);
+
+ commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+ }
+
CommitManager->Commit(commit);
CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());
@@ -2605,6 +2707,41 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) {
}
}
+void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx) {
+ TMonotonic ts = AppData(ctx)->MonotonicTimeProvider->Now();
+
+ LeaseDropped = true;
+ LeaseEnd = Min(LeaseEnd, ts);
+
+ for (auto& l : LeaseCommits) {
+ l.LeaseEnd = Min(l.LeaseEnd, ts);
+ }
+
+ ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped);
+ Owner->ReadOnlyLeaseDropped();
+}
+
+void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &) {
+ Y_VERIFY(LeaseExtendPending);
+ LeaseExtendPending = false;
+
+ if (!LeaseCommits.empty() || !LeaseEnabled || LeaseDropped) {
+ return;
+ }
+
+ if (LeaseUsed) {
+ LeaseUsed = false;
+ UnusedLeaseExtensions = 0;
+ } else if (UnusedLeaseExtensions >= 5) {
+ return;
+ } else {
+ ++UnusedLeaseExtensions;
+ }
+
+ // Start a new lease extension commit
+ EnsureReadOnlyLease(LeaseEnd);
+}
+
void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx) {
TEvTablet::TEvCommitResult *msg = ev->Get();
@@ -2631,6 +2768,41 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
<< " for step " << step;
}
+ if (!LeaseCommits.empty()) {
+ auto& l = LeaseCommits.front();
+ Y_VERIFY(step <= l.Step);
+ if (step == l.Step) {
+ LeasePersisted = true;
+ LeaseEnd = Max(LeaseEnd, l.LeaseEnd);
+
+ while (!l.Callbacks.empty()) {
+ // Note: callback may (though unlikely) recursively add more callbacks
+ TVector<std::function<void()>> callbacks;
+ callbacks.swap(l.Callbacks);
+ for (auto& callback : callbacks) {
+ callback();
+ }
+ }
+
+ LeaseCommits.pop_front();
+
+ // Calculate a full round-trip latency for leases
+ // When this latency is larger than third of lease duration we want
+ // to increase lease duration so we would have enough time for
+ // processing read-only requests without additional commits
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ if ((LeaseEnd - ts) < LeaseDuration / 3) {
+ LeaseDuration *= 2;
+ }
+
+ // We want to schedule a new commit before the lease expires
+ if (LeaseCommits.empty() && !LeaseExtendPending) {
+ Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend);
+ LeaseExtendPending = true;
+ }
+ }
+ }
+
switch (cookie) {
case ECommit::Redo:
{
@@ -3562,11 +3734,13 @@ STFUNC(TExecutor::StateWork) {
CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters);
cFunc(TEvPrivate::EvCheckYellow, UpdateYellow);
cFunc(TEvPrivate::EvUpdateCompactions, UpdateCompactions);
+ HFunc(TEvPrivate::TEvLeaseExtend, Handle);
HFunc(TEvents::TEvWakeup, Wakeup);
hFunc(TEvents::TEvFlushLog, Handle);
hFunc(NSharedCache::TEvRequest, Handle);
hFunc(NSharedCache::TEvResult, Handle);
hFunc(NSharedCache::TEvUpdated, Handle);
+ HFunc(TEvTablet::TEvDropLease, Handle);
HFunc(TEvTablet::TEvCommitResult, Handle);
hFunc(TEvTablet::TEvCheckBlobstorageStatusResult, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index df08b3a338..3a61e5385d 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -334,6 +334,7 @@ class TExecutor
EvActivateCompactionRead,
EvActivateCompactionChanges,
EvBrokenTransaction,
+ EvLeaseExtend,
EvEnd
};
@@ -347,6 +348,7 @@ class TExecutor
struct TEvActivateCompactionRead : public TEventLocal<TEvActivateCompactionRead, EvActivateCompactionRead> {};
struct TEvActivateCompactionChanges : public TEventLocal<TEvActivateCompactionChanges, EvActivateCompactionChanges> {};
struct TEvBrokenTransaction : public TEventLocal<TEvBrokenTransaction, EvBrokenTransaction> {};
+ struct TEvLeaseExtend : public TEventLocal<TEvLeaseExtend, EvLeaseExtend> {};
};
const TIntrusivePtr<ITimeProvider> Time = nullptr;
@@ -356,6 +358,37 @@ class TExecutor
ui32 FollowerId = 0;
+ // This becomes true when executor enables the use of leases, e.g. starts persisting them
+ // This may become false again when leases are not actively used for some time
+ bool LeaseEnabled = false;
+ // As soon as lease is persisted we may theoretically use read-only checks for lease prolongation
+ bool LeasePersisted = false;
+ // When lease is dropped we must stop accepting new lease-dependent requests
+ bool LeaseDropped = false;
+ // When lease is used in any given cycle this becomes true
+ bool LeaseUsed = false;
+ // This flag marks when TEvLeaseExtend message is already pending
+ bool LeaseExtendPending = false;
+ TDuration LeaseDuration;
+ TMonotonic LeaseEnd;
+ // Counts the number of times an unused lease has been extended
+ size_t UnusedLeaseExtensions = 0;
+
+ struct TLeaseCommit {
+ const ui32 Step;
+ const TMonotonic Start;
+ TMonotonic LeaseEnd;
+ TVector<std::function<void()>> Callbacks;
+
+ TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd)
+ : Step(step)
+ , Start(start)
+ , LeaseEnd(leaseEnd)
+ { }
+ };
+
+ TList<TLeaseCommit> LeaseCommits;
+
using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>;
THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue;
THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> PendingQueue;
@@ -502,6 +535,8 @@ class TExecutor
void ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch);
void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPrivate::TEvLeaseExtend::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx);
@@ -575,6 +610,11 @@ public:
void DetachTablet(const TActorContext &ctx) override;
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
+ TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
+ void ConfirmReadOnlyLease(TMonotonic at) override;
+ void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override;
+ void ConfirmReadOnlyLease(std::function<void()> callback) override;
+
TString BorrowSnapshot(ui32 tableId, const TTableSnapshotContext& snap, TRawVals from, TRawVals to, ui64 loaner) const override;
ui64 MakeScanSnapshot(ui32 table) override;
diff --git a/ydb/core/tablet_flat/flat_executor.proto b/ydb/core/tablet_flat/flat_executor.proto
index 7758812187..0b865d16df 100644
--- a/ydb/core/tablet_flat/flat_executor.proto
+++ b/ydb/core/tablet_flat/flat_executor.proto
@@ -1,6 +1,7 @@
import "ydb/core/protos/base.proto";
import "ydb/core/protos/tablet.proto";
import "ydb/core/protos/flat_scheme_op.proto";
+import "library/cpp/actors/protos/actors.proto";
package NKikimrExecutorFlat;
option java_package = "ru.yandex.kikimr.proto";
@@ -247,3 +248,13 @@ message TDatabaseBorrowPart {
repeated TPartInfo Parts = 3;
repeated TTxStatus TxStatusParts = 4;
}
+
+message TLeaseInfoMetadata {
+ // Actor id of the lease holder, which is an executor actor
+ optional NActorsProto.TActorId LeaseHolder = 1;
+
+ // Lease duration in microseconds. Lease holder may perform leader-only
+ // tasks up to this duration since the last channel 0 block confirmation,
+ // and expects future generations will not interfere.
+ optional uint32 LeaseDurationUs = 2;
+}
diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
index cf72960f9c..4292a61d3c 100644
--- a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
@@ -103,9 +103,14 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot(
TEvTablet::TEvBoot::TPtr &ev,
TExecutorCaches &&caches)
{
- PrepareEnv(false, ev->Get()->Generation, std::move(caches));
+ TEvTablet::TEvBoot *msg = ev->Get();
+ PrepareEnv(false, msg->Generation, std::move(caches));
- Steps->Spawn<NBoot::TStages>(std::move(ev->Get()->DependencyGraph), nullptr);
+ if (msg->DependencyGraph) {
+ StartLeaseWaiter(BootTimestamp, *msg->DependencyGraph);
+ }
+
+ Steps->Spawn<NBoot::TStages>(std::move(msg->DependencyGraph), nullptr);
Steps->Execute();
return CheckCompletion();
@@ -113,7 +118,7 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot(
void TExecutorBootLogic::PrepareEnv(bool follower, ui32 gen, TExecutorCaches caches) noexcept
{
- BootStartTime = TAppData::TimeProvider->Now();
+ BootTimestamp = AppData()->MonotonicTimeProvider->Now();
auto *sys = TlsActivationContext->ExecutorThread.ActorSystem;
auto *logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_FLATBOOT);
@@ -212,9 +217,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::CheckCompletion()
if (Loads)
return OpResultContinue;
+ if (LeaseWaiter)
+ return OpResultContinue;
+
if (State().Follower || Restored) {
if (auto logl = Steps->Logger()->Log(ELnLev::Info)) {
- auto spent = TAppData::TimeProvider->Now() - BootStartTime;
+ auto spent = AppData()->MonotonicTimeProvider->Now() - BootTimestamp;
logl
<< NFmt::Do(State()) << " booting completed"
@@ -278,6 +286,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl
step.Drop();
Steps->Execute();
+ } else if (auto *msg = ev.CastAsLocal<TEvTablet::TEvLeaseDropped>()) {
+ if (LeaseWaiter != ev.Sender) {
+ return OpResultUnhandled;
+ }
+
+ LeaseWaiter = { };
} else {
return OpResultUnhandled;
}
@@ -291,6 +305,9 @@ TAutoPtr<NBoot::TResult> TExecutorBootLogic::ExtractState() noexcept {
}
void TExecutorBootLogic::Cancel() {
+ if (LeaseWaiter) {
+ Ops->Send(LeaseWaiter, new TEvents::TEvPoison);
+ }
}
void TExecutorBootLogic::FollowersSyncComplete() {
diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.h b/ydb/core/tablet_flat/flat_executor_bootlogic.h
index 34edb2d6b4..56084c36b0 100644
--- a/ydb/core/tablet_flat/flat_executor_bootlogic.h
+++ b/ydb/core/tablet_flat/flat_executor_bootlogic.h
@@ -74,8 +74,9 @@ private:
TAutoPtr<NBoot::TBack> State_;
TAutoPtr<NBoot::TResult> Result_;
TAutoPtr<NBoot::TRoot> Steps;
+ TActorId LeaseWaiter;
- TInstant BootStartTime;
+ TMonotonic BootTimestamp;
const TIntrusiveConstPtr<TTabletStorageInfo> Info;
@@ -91,6 +92,7 @@ private:
EOpResult CheckCompletion();
void PrepareEnv(bool follower, ui32 generation, TExecutorCaches caches) noexcept;
+ void StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept;
ui32 GetBSGroupFor(const TLogoBlobID &logo) const;
ui32 GetBSGroupID(ui32 channel, ui32 generation);
void LoadEntry(TIntrusivePtr<NBoot::TLoadBlobs>);
diff --git a/ydb/core/tablet_flat/flat_executor_leases_ut.cpp b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
new file mode 100644
index 0000000000..3761f4fcd0
--- /dev/null
+++ b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
@@ -0,0 +1,365 @@
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/base/statestorage.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NTabletFlatExecutor {
+
+Y_UNIT_TEST_SUITE(TFlatExecutorLeases) {
+
+ struct TEvLeasesTablet {
+ enum EEv {
+ EvWrite = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvWriteAck,
+ EvRead,
+ EvReadResult,
+ };
+
+ struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
+ TEvWrite(ui64 key, ui64 value)
+ : Key(key)
+ , Value(value)
+ { }
+
+ const ui64 Key;
+ const ui64 Value;
+ };
+
+ struct TEvWriteAck : public TEventLocal<TEvWriteAck, EvWriteAck> {
+ TEvWriteAck(ui64 key)
+ : Key(key)
+ { }
+
+ const ui64 Key;
+ };
+
+ struct TEvRead : public TEventLocal<TEvRead, EvRead> {
+ TEvRead(ui64 key)
+ : Key(key)
+ { }
+
+ const ui64 Key;
+ };
+
+ struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> {
+ TEvReadResult(ui64 key, ui64 value)
+ : Key(key)
+ , Value(value)
+ { }
+
+ const ui64 Key;
+ const ui64 Value;
+ };
+ };
+
+ class TLeasesTablet
+ : public TActor<TLeasesTablet>
+ , public TTabletExecutedFlat
+ {
+ public:
+ TLeasesTablet(const TActorId& tablet, TTabletStorageInfo* info, bool enableInitialLease)
+ : TActor(&TThis::StateInit)
+ , TTabletExecutedFlat(info, tablet, nullptr)
+ , EnableInitialLease(enableInitialLease)
+ { }
+
+ private:
+ struct Schema : NIceDb::Schema {
+ struct Data : Table<1> {
+ struct Key : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct Value : Column<2, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<Key>;
+ using TColumns = TableColumns<Key, Value>;
+ };
+
+ using TTables = SchemaTables<Data>;
+ };
+
+ using TTxBase = TTransactionBase<TLeasesTablet>;
+
+ struct TTxInitSchema : public TTxBase {
+ TTxInitSchema(TSelf* self)
+ : TTxBase(self)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Materialize<Schema>();
+ Self->RunTxInit();
+ return true;
+ }
+
+ void Complete(const TActorContext&) {
+ // nothing
+ }
+ };
+
+ void RunTxInitSchema() {
+ Execute(new TTxInitSchema(this));
+ }
+
+ struct TTxInit : public TTxBase {
+ TTxInit(TSelf* self)
+ : TTxBase(self)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ NIceDb::TNiceDb db(txc.DB);
+
+ auto rowset = db.Table<Schema::Data>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ ui64 key = rowset.GetValue<Schema::Data::Key>();
+ ui64 value = rowset.GetValue<Schema::Data::Value>();
+ Self->Data[key] = value;
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) {
+ Self->SwitchToWork(ctx);
+ }
+ };
+
+ void RunTxInit() {
+ Execute(new TTxInit(this));
+ }
+
+ struct TTxWrite : public TTxBase {
+ TTxWrite(TSelf* self, TEvLeasesTablet::TEvWrite::TPtr&& ev)
+ : TTxBase(self)
+ , Ev(std::move(ev))
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ auto* msg = Ev->Get();
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ db.Table<Schema::Data>().Key(msg->Key).Update(
+ NIceDb::TUpdate<Schema::Data::Value>(msg->Value));
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) {
+ auto* msg = Ev->Get();
+ Self->Data[msg->Key] = msg->Value;
+
+ ctx.Send(Ev->Sender, new TEvLeasesTablet::TEvWriteAck(msg->Key), 0, Ev->Cookie);
+ }
+
+ const TEvLeasesTablet::TEvWrite::TPtr Ev;
+ };
+
+ void Handle(TEvLeasesTablet::TEvWrite::TPtr& ev) {
+ Execute(new TTxWrite(this, std::move(ev)));
+ }
+
+ void Handle(TEvLeasesTablet::TEvRead::TPtr& ev) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto key = ev->Get()->Key;
+ auto value = Data[key];
+ Executor()->ConfirmReadOnlyLease([this, sender, cookie, key, value]() {
+ Send(sender, new TEvLeasesTablet::TEvReadResult(key, value), 0, cookie);
+ });
+ }
+
+ private:
+ void OnDetach(const TActorContext& ctx) override {
+ Die(ctx);
+ }
+
+ void OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext& ctx) override {
+ Die(ctx);
+ }
+
+ void OnActivateExecutor(const TActorContext&) override {
+ Become(&TThis::StateWork);
+ RunTxInitSchema();
+ }
+
+ void DefaultSignalTabletActive(const TActorContext&) override {
+ // nothing
+ }
+
+ void SwitchToWork(const TActorContext& ctx) {
+ SignalTabletActive(ctx);
+ }
+
+ bool ReadOnlyLeaseEnabled() override {
+ return EnableInitialLease;
+ }
+
+ private:
+ STFUNC(StateInit) {
+ StateInitImpl(ev, ctx);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvLeasesTablet::TEvWrite, Handle);
+ hFunc(TEvLeasesTablet::TEvRead, Handle);
+ default:
+ HandleDefaultEvents(ev, ctx);
+ }
+ }
+
+ private:
+ const bool EnableInitialLease;
+ THashMap<ui64, ui64> Data;
+ };
+
+ void DoBasics(bool deliverDropLease, bool enableInitialLease) {
+ TTestBasicRuntime runtime(2);
+ SetupTabletServices(runtime);
+ runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+
+ const ui64 tabletId = TTestTxConfig::TxTablet0;
+
+ auto boot1 = CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY),
+ [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) {
+ return new TLeasesTablet(tablet, info, enableInitialLease);
+ });
+ runtime.EnableScheduleForActor(boot1);
+
+ {
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1));
+ runtime.DispatchEvents(options);
+ }
+
+ auto sender = runtime.AllocateEdgeActor(0);
+ auto pipe1 = runtime.ConnectToPipe(tabletId, sender, 0, NTabletPipe::TClientRetryPolicy::WithRetries());
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvWrite(1, 11));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvWriteAck>(sender);
+ }
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ bool blockDropLease = true;
+ TVector<THolder<IEventHandle>> dropLeaseMsgs;
+ auto observerFunc = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTablet::TEvDropLease::EventType:
+ if (blockDropLease) {
+ dropLeaseMsgs.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ case TEvTablet::TEvTabletDead::EventType:
+ // Prevent tablets from restarting
+ // This is most important for the boot1 actor, since it
+ // quickly receives bad commit signal and tries to restart
+ // the original tablet. However we prevent executor from
+ // killing itself too, so we could make additional queries.
+ return TTestActorRuntime::EEventAction::DROP;
+ case TEvTablet::TEvDemoted::EventType:
+ // Block guardian from telling tablet about a new generation
+ return TTestActorRuntime::EEventAction::DROP;
+ case TEvStateStorage::TEvReplicaLeaderDemoted::EventType:
+ // Block replica from telling tablet about a new generation
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(observerFunc);
+
+ auto boot2 = CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY),
+ [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) {
+ return new TLeasesTablet(tablet, info, enableInitialLease);
+ },
+ /* node index */ 1);
+ runtime.EnableScheduleForActor(boot2);
+
+ {
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1));
+ runtime.DispatchEvents(options);
+ }
+
+ // After the new tablet is restored, we should still be able to read from the old tablet for a while
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return dropLeaseMsgs.size() == 1; }, "drop lease message");
+
+ // We expect the new tablet to send a drop lease message, but the old tablet must still be readable, because it didn't receive it yet
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ if (deliverDropLease) {
+ blockDropLease = false;
+ for (auto& ev : dropLeaseMsgs) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ dropLeaseMsgs.clear();
+ }
+
+ auto sender2 = runtime.AllocateEdgeActor(1);
+ auto pipe2 = runtime.ConnectToPipe(tabletId, sender2, 1, NTabletPipe::TClientRetryPolicy::WithRetries());
+ runtime.SendToPipe(pipe2, sender2, new TEvLeasesTablet::TEvRead(1), 1);
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender2);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender, TDuration::Seconds(1));
+ UNIT_ASSERT(!ev);
+ }
+ }
+
+ Y_UNIT_TEST(Basics) {
+ DoBasics(true, false);
+ }
+
+ Y_UNIT_TEST(BasicsLeaseTimeout) {
+ DoBasics(false, false);
+ }
+
+ Y_UNIT_TEST(BasicsInitialLease) {
+ DoBasics(true, true);
+ }
+
+ Y_UNIT_TEST(BasicsInitialLeaseTimeout) {
+ DoBasics(false, true);
+ }
+}
+
+} // namespace NKikimr::NTabletFlatExecutor
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp
index f69991ece9..258addbf7f 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.cpp
+++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp
@@ -44,6 +44,18 @@ namespace NFlatExecutorSetup {
if (launcherID)
LauncherActorID = launcherID;
}
+
+ bool ITablet::ReadOnlyLeaseEnabled() {
+ return false;
+ }
+
+ TDuration ITablet::ReadOnlyLeaseDuration() {
+ return TDuration::MilliSeconds(250);
+ }
+
+ void ITablet::ReadOnlyLeaseDropped() {
+ // nothing by default
+ }
}
}}
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index 075ebce354..babe00d863 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -452,6 +452,10 @@ namespace NFlatExecutorSetup {
virtual void OnLeaderUserAuxUpdate(TString) { /* default */ }
+ virtual bool ReadOnlyLeaseEnabled();
+ virtual TDuration ReadOnlyLeaseDuration();
+ virtual void ReadOnlyLeaseDropped();
+
// create transaction?
protected:
ITablet(TTabletStorageInfo *info, const TActorId &tablet)
@@ -496,6 +500,10 @@ namespace NFlatExecutorSetup {
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
+ virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
+ virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;
+ virtual void ConfirmReadOnlyLease(std::function<void()> callback) = 0;
+
/* Make blob with data required for table bootstapping. Note:
1. Once non-trivial blob obtained and commited in tx all of its
borrowed bundles have to be eventually released (see db).
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
index 720279b57a..c5146c6184 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tablet_flat-ut PUBLIC
ydb-core-scheme
test-libs-exec
test-libs-table
+ ydb-core-testlib
udf-service-exception_policy
)
target_sources(ydb-core-tablet_flat-ut PRIVATE
@@ -32,6 +33,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_database_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_sausagecache_ut.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
index dd2fe3b1bb..fff5984997 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tablet_flat-ut PUBLIC
ydb-core-scheme
test-libs-exec
test-libs-table
+ ydb-core-testlib
udf-service-exception_policy
)
target_sources(ydb-core-tablet_flat-ut PRIVATE
@@ -33,6 +34,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_database_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_sausagecache_ut.cpp
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 9b0c97c731..58d4d05cd3 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -207,9 +207,6 @@ namespace Tests {
const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode;
SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams);
- CreateBootstrapTablets();
- SetupStorage();
-
for (ui32 nodeIdx = 0; nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) {
SetupDomainLocalService(nodeIdx);
Runtime->GetAppData(nodeIdx).AuthConfig.MergeFrom(Settings->AuthConfig);
@@ -223,6 +220,9 @@ namespace Tests {
SetupConfigurators(nodeIdx);
SetupProxies(nodeIdx);
}
+
+ CreateBootstrapTablets();
+ SetupStorage();
}
void TServer::SetupMessageBus(ui16 port, const TString &tracePath) {
diff --git a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
index 8345d93485..e6a36f0007 100644
--- a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
+++ b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
@@ -96,11 +96,27 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID()
<< " HANDLE TEvAcquireReadStep");
+ IncCounter(COUNTER_REQ_ACQUIRE_READ_STEP);
+
if (Y_UNLIKELY(Stopping)) {
// We won't be able to commit anyway
return;
}
+ if (ReadOnlyLeaseEnabled()) {
+ // We acquire read step using a read-only lease from executor
+ // It is guaranteed that any future generation was not running at
+ // the time ConfirmReadOnlyLease was called.
+ TActorId sender = ev->Sender;
+ ui64 cookie = ev->Cookie;
+ ui64 step = Max(VolatileState.LastSentStep, VolatileState.LastAcquired);
+ VolatileState.LastAcquired = step;
+ Executor()->ConfirmReadOnlyLease([this, sender, cookie, step]() {
+ Send(sender, new TEvTxProxy::TEvAcquireReadStepResult(TabletID(), step), 0, cookie);
+ });
+ return;
+ }
+
VolatileState.AcquireReadStepPending.emplace_back(ev->Sender, ev->Cookie);
MaybeFlushAcquireReadStep(ctx);
}
diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp
index 1fbc46f449..49aa706c77 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.cpp
+++ b/ydb/core/tx/coordinator/coordinator_impl.cpp
@@ -50,6 +50,8 @@ const ui32 TTxCoordinator::Schema::CurrentVersion = 1;
TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
+ , EnableLeaderLeases(0, 0, 1)
+ , MinLeaderLeaseDurationUs(250000, 1000, 5000000)
#ifdef COORDINATOR_LOG_TO_FILE
, DebugName(Sprintf("/tmp/coordinator_db_log_%" PRIu64 ".%" PRIi32 ".%" PRIu64 ".gz", TabletID(), getpid(), tablet.LocalId()))
, DebugLogFile(DebugName)
@@ -316,7 +318,28 @@ void TTxCoordinator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, cons
}
}
+void TTxCoordinator::IcbRegister() {
+ if (!IcbRegistered) {
+ AppData()->Icb->RegisterSharedControl(EnableLeaderLeases, "CoordinatorControls.EnableLeaderLeases");
+ AppData()->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "CoordinatorControls.MinLeaderLeaseDurationUs");
+ IcbRegistered = true;
+ }
+}
+
+bool TTxCoordinator::ReadOnlyLeaseEnabled() {
+ IcbRegister();
+ ui64 value = EnableLeaderLeases;
+ return value != 0;
+}
+
+TDuration TTxCoordinator::ReadOnlyLeaseDuration() {
+ IcbRegister();
+ ui64 value = MinLeaderLeaseDurationUs;
+ return TDuration::MicroSeconds(value);
+}
+
void TTxCoordinator::OnActivateExecutor(const TActorContext &ctx) {
+ IcbRegister();
TryInitMonCounters(ctx);
Executor()->RegisterExternalTabletCounters(TabletCountersPtr);
Execute(CreateTxSchema(), ctx);
diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h
index 28a817ae46..82f5f4b4d2 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.h
+++ b/ydb/core/tx/coordinator/coordinator_impl.h
@@ -8,6 +8,8 @@
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tx_processing.h>
+#include <ydb/core/control/immediate_control_board_wrapper.h>
+#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -461,6 +463,10 @@ private:
i64 CurrentTxInFly;
};
+ bool IcbRegistered = false;
+ TControlWrapper EnableLeaderLeases;
+ TControlWrapper MinLeaderLeaseDurationUs;
+
TVolatileState VolatileState;
TConfig Config;
TCoordinatorMonCounters MonCounters;
@@ -503,6 +509,14 @@ private:
return TActor::Die(ctx);
}
+ void IcbRegister();
+ bool ReadOnlyLeaseEnabled() override;
+ TDuration ReadOnlyLeaseDuration() override;
+
+ void IncCounter(NFlatTxCoordinator::ECumulativeCounters counter, ui64 num = 1) {
+ TabletCounters->Cumulative()[counter].Increment(num);
+ }
+
void OnActivateExecutor(const TActorContext &ctx) override;
void DefaultSignalTabletActive(const TActorContext &ctx) override {