diff options
34 files changed, 1000 insertions, 68 deletions
diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h index 602e39c600..3b97f4fe09 100644 --- a/ydb/core/base/tablet.h +++ b/ydb/core/base/tablet.h @@ -52,6 +52,7 @@ struct TEvTablet { EvFollowerSyncComplete, // from leader to user tablet when all old followers are touched and synced EvCutTabletHistory, EvUpdateConfig, + EvDropLease, EvCommit = EvBoot + 512, EvAux, @@ -60,6 +61,7 @@ struct TEvTablet { EvTabletActive, EvPromoteToLeader, EvFGcAck, // from user tablet to follower + EvLeaseDropped, EvTabletDead = EvBoot + 1024, EvFollowerUpdateState, // notifications to guardian @@ -101,6 +103,16 @@ struct TEvTablet { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET)"); + struct TCommitMetadata { + ui32 Key; + TString Data; + + TCommitMetadata(ui32 key, TString data) + : Key(key) + , Data(std::move(data)) + { } + }; + struct TDependencyGraph : public TThrRefBase { struct TEntry { std::pair<ui32, ui32> Id; @@ -111,28 +123,29 @@ struct TEvTablet { TVector<TLogoBlobID> GcLeft; TString EmbeddedLogBody; - - TEntry() - : IsSnapshot(false) - {} - - void Set(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &refs, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Id = id; - References.swap(refs); - IsSnapshot = isSnapshot; - GcDiscovered.swap(gcDiscovered); - GcLeft.swap(gcLeft); - EmbeddedLogBody.clear(); - } - - void Set(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Id = id; - References.clear(); - IsSnapshot = false; - GcDiscovered.swap(gcDiscovered); - GcLeft.swap(gcLeft); - EmbeddedLogBody = embeddedLogBody; - } + TVector<TCommitMetadata> EmbeddedMetadata; + + TEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + : Id(id) + , IsSnapshot(isSnapshot) + , References(std::move(refs)) + , GcDiscovered(std::move(gcDiscovered)) + , GcLeft(std::move(gcLeft)) + , EmbeddedMetadata(std::move(metadata)) + { } + + TEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + : Id(id) + , IsSnapshot(false) + , GcDiscovered(std::move(gcDiscovered)) + , GcLeft(std::move(gcLeft)) + , EmbeddedLogBody(std::move(embeddedLogBody)) + , EmbeddedMetadata(std::move(metadata)) + { } }; std::pair<ui32, ui32> Snapshot; @@ -148,19 +161,23 @@ struct TEvTablet { << " entries " << Entries.size() << "}"; } - void AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &references, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { + TEntry& AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + { if (isSnapshot) { Snapshot = id; Entries.clear(); } - Entries.push_back(TEntry()); - Entries.back().Set(id, references, isSnapshot, gcDiscovered, gcLeft); + return Entries.emplace_back(id, std::move(refs), isSnapshot, std::move(gcDiscovered), std::move(gcLeft), std::move(metadata)); } - void AddEntry(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Entries.push_back(TEntry()); - Entries.back().Set(id, embeddedLogBody, gcDiscovered, gcLeft); + TEntry& AddEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + { + return Entries.emplace_back(id, std::move(embeddedLogBody), std::move(gcDiscovered), std::move(gcLeft), std::move(metadata)); } void Invalidate() { @@ -269,6 +286,8 @@ struct TEvTablet { TString EmbeddedLogBody; TString FollowerAux; + TVector<TCommitMetadata> EmbeddedMetadata; + TEvCommit(ui64 tabletId, ui32 gen, ui32 step, const TVector<ui32> &dependsOn, bool isSnapshot , bool preCommited = false , TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticMinLatency) @@ -777,6 +796,22 @@ struct TEvTablet { }; struct TEvTabletStopped : TEventLocal<TEvTabletStopped, EvTabletStopped> {}; + + struct TEvDropLease : TEventPB<TEvDropLease, NKikimrTabletBase::TEvDropLease, EvDropLease> { + TEvDropLease() = default; + + explicit TEvDropLease(ui64 tabletId) { + Record.SetTabletID(tabletId); + } + }; + + struct TEvLeaseDropped : TEventPB<TEvLeaseDropped, NKikimrTabletBase::TEvLeaseDropped, EvLeaseDropped> { + TEvLeaseDropped() = default; + + explicit TEvLeaseDropped(ui64 tabletId) { + Record.SetTabletID(tabletId); + } + }; }; IActor* CreateTabletKiller(ui64 tabletId, ui32 nodeId = 0, ui32 maxGeneration = Max<ui32>()); diff --git a/ydb/core/cms/console/immediate_controls_configurator.cpp b/ydb/core/cms/console/immediate_controls_configurator.cpp index af3ad24217..4f959a1891 100644 --- a/ydb/core/cms/console/immediate_controls_configurator.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator.cpp @@ -18,7 +18,8 @@ public: } TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg); + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls); void Bootstrap(const TActorContext &ctx); @@ -38,13 +39,15 @@ public: } private: - void CreateControls(TIntrusivePtr<TControlBoard> board); + void CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting); void CreateControls(TIntrusivePtr<TControlBoard> board, const google::protobuf::Descriptor *desc, - const TString &prefix); + const TString &prefix, + bool allowExisting); void AddControl(TIntrusivePtr<TControlBoard> board, const google::protobuf::FieldDescriptor *desc, - const TString &prefix); + const TString &prefix, + bool allowExisting); void ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg, TIntrusivePtr<TControlBoard> board); void ApplyConfig(const ::google::protobuf::Message &cfg, @@ -57,9 +60,10 @@ private: }; TImmediateControlsConfigurator::TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg) + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls) { - CreateControls(board); + CreateControls(board, allowExistingControls); ApplyConfig(cfg, board); } @@ -98,15 +102,16 @@ void TImmediateControlsConfigurator::Handle(TEvConsole::TEvConfigNotificationReq ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie); } -void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board) +void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting) { auto *desc = NKikimrConfig::TImmediateControlsConfig::descriptor(); - CreateControls(board, desc, ""); + CreateControls(board, desc, "", allowExisting); } void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, const google::protobuf::Descriptor *desc, - const TString &prefix) + const TString &prefix, + bool allowExisting) { for (int i = 0; i < desc->field_count(); ++i) { auto *fieldDesc = desc->field(i); @@ -117,20 +122,22 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> auto fieldType = fieldDesc->type(); if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64 || fieldType == google::protobuf::FieldDescriptor::TYPE_INT64) - AddControl(board, fieldDesc, prefix); + AddControl(board, fieldDesc, prefix, allowExisting); else { Y_VERIFY(fieldType == google::protobuf::FieldDescriptor::TYPE_MESSAGE, "Only [u]int64 and message fields are allowed in Immediate Controls Config"); CreateControls(board, fieldDesc->message_type(), - MakePrefix(prefix, fieldDesc->name())); + MakePrefix(prefix, fieldDesc->name()), + allowExisting); } } } void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> board, const google::protobuf::FieldDescriptor *desc, - const TString &prefix) + const TString &prefix, + bool allowExisting) { auto &opts = desc->options().GetExtension(NKikimrConfig::ControlOptions); auto name = MakePrefix(prefix, desc->name()); @@ -139,11 +146,22 @@ void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> boa ui64 maxValue = opts.GetMaxValue(); Controls[name] = TControlWrapper(defaultValue, minValue, maxValue); - // All controls are actually shared but use RegisterLocalControl to - // make sure it is not registered by someone else with other options. - auto res = board->RegisterLocalControl(Controls[name], name); - Y_VERIFY_S(res, "Immediate Control " << name << " was registered before " - << "TImmediateControlsConfigurator creation"); + + // When we register control it is possible that is has already been + // registered by some other code, in which case it may be used before it + // is properly configured. It can currently only happen in configurator + // tests, where it is created very late after some tablets have already + // started. + auto res = board->RegisterSharedControl(Controls[name], name); + Y_VERIFY_S(res || allowExisting, + "Immediate Control " << name << " was registered before " + << "TImmediateControlsConfigurator creation"); + if (Y_UNLIKELY(!res)) { + Cerr << "WARNING: immediate control " << name << " was registered before " + << "TImmediateControlsConfigurator creation. " + << "A default value may have been used before it was configured." << Endl; + Controls[name].Reset(defaultValue, minValue, maxValue); + } } void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg, @@ -192,9 +210,10 @@ TString TImmediateControlsConfigurator::MakePrefix(const TString &prefix, } IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg) + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls) { - return new TImmediateControlsConfigurator(board, cfg); + return new TImmediateControlsConfigurator(board, cfg, allowExistingControls); } } // namespace NConsole diff --git a/ydb/core/cms/console/immediate_controls_configurator.h b/ydb/core/cms/console/immediate_controls_configurator.h index c6c4ba029d..53f1b3e5d7 100644 --- a/ydb/core/cms/console/immediate_controls_configurator.h +++ b/ydb/core/cms/console/immediate_controls_configurator.h @@ -13,7 +13,8 @@ namespace NConsole { * immediate control board via CMS. */ IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg); + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls = false); } // namespace NConsole } // namespace NKikimr diff --git a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp index 8ad8f6ed1b..2d2a9f8251 100644 --- a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp @@ -57,7 +57,8 @@ NKikimrConsole::TConfigItem ITEM_CONTROLS_EXCEED_MAX; void InitImmediateControlsConfigurator(TTenantTestRuntime &runtime) { runtime.Register(CreateImmediateControlsConfigurator(runtime.GetAppData().Icb, - NKikimrConfig::TImmediateControlsConfig())); + NKikimrConfig::TImmediateControlsConfig(), + /* allowExistingControls */ true)); TDispatchOptions options; options.FinalEvents.emplace_back(TEvConfigsDispatcher::EvSetConfigSubscriptionResponse, 1); runtime.DispatchEvents(options); diff --git a/ydb/core/cms/json_proxy_proto.h b/ydb/core/cms/json_proxy_proto.h index b1b87f30cd..9c66617066 100644 --- a/ydb/core/cms/json_proxy_proto.h +++ b/ydb/core/cms/json_proxy_proto.h @@ -73,6 +73,8 @@ protected: return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TDataShardControls::TExecutionProfileOptions::descriptor(), ctx); else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTxLimitControls") return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTxLimitControls::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TCoordinatorControls") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TCoordinatorControls::descriptor(), ctx); } ctx.Send(RequestEvent->Sender, diff --git a/ydb/core/control/immediate_control_board_control.cpp b/ydb/core/control/immediate_control_board_control.cpp index 7762cf9d88..6acb68e985 100644 --- a/ydb/core/control/immediate_control_board_control.cpp +++ b/ydb/core/control/immediate_control_board_control.cpp @@ -15,6 +15,13 @@ void TControl::Set(TAtomicBase newValue) { AtomicSet(Default, newValue); } +void TControl::Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) { + Value = defaultValue; + Default = defaultValue; + LowerBound = lowerBound; + UpperBound = upperBound; +} + TAtomicBase TControl::SetFromHtmlRequest(TAtomicBase newValue) { TAtomicBase prevValue = AtomicGet(Value); if (newValue == AtomicGet(Default)) { diff --git a/ydb/core/control/immediate_control_board_control.h b/ydb/core/control/immediate_control_board_control.h index 7fd038b03a..381b9cb0fc 100644 --- a/ydb/core/control/immediate_control_board_control.h +++ b/ydb/core/control/immediate_control_board_control.h @@ -16,6 +16,7 @@ public: TControl(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound); void Set(TAtomicBase newValue); + void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound); TAtomicBase SetFromHtmlRequest(TAtomicBase newValue); diff --git a/ydb/core/control/immediate_control_board_impl.cpp b/ydb/core/control/immediate_control_board_impl.cpp index fa26926fae..06f6dab1a8 100644 --- a/ydb/core/control/immediate_control_board_impl.cpp +++ b/ydb/core/control/immediate_control_board_impl.cpp @@ -15,8 +15,14 @@ bool TControlBoard::RegisterLocalControl(TControlWrapper control, TString name) return result; } -void TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) { - control.Control = Board.InsertIfAbsent(name, control.Control); +bool TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) { + auto& ptr = Board.InsertIfAbsent(name, control.Control); + if (control.Control == ptr) { + return true; + } else { + control.Control = ptr; + return false; + } } void TControlBoard::RestoreDefaults() { diff --git a/ydb/core/control/immediate_control_board_impl.h b/ydb/core/control/immediate_control_board_impl.h index a01e09f4d6..f5463aa12d 100644 --- a/ydb/core/control/immediate_control_board_impl.h +++ b/ydb/core/control/immediate_control_board_impl.h @@ -14,7 +14,7 @@ private: public: bool RegisterLocalControl(TControlWrapper control, TString name); - void RegisterSharedControl(TControlWrapper& control, TString name); + bool RegisterSharedControl(TControlWrapper& control, TString name); void RestoreDefaults(); diff --git a/ydb/core/control/immediate_control_board_wrapper.h b/ydb/core/control/immediate_control_board_wrapper.h index ce8a6adde5..e2da0d80d0 100644 --- a/ydb/core/control/immediate_control_board_wrapper.h +++ b/ydb/core/control/immediate_control_board_wrapper.h @@ -29,6 +29,15 @@ public: bool IsTheSame(TControlWrapper another) { return Control == another.Control; } + + /** + * Resets an existing control to different values. + * + * WARNING: this method is not thread safe and may only be used during initialization. + */ + void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) { + Control->Reset(defaultValue, lowerBound, upperBound); + } }; class TMemorizableControlWrapper { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ebc7d0eaa3..e386092b17 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1216,8 +1216,22 @@ message TImmediateControlsConfig { DefaultValue: 60000 }]; } + message TCoordinatorControls { + optional uint64 EnableLeaderLeases = 1 [(ControlOptions) = { + Description: "Enables leader leases for processing read-only queries", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 MinLeaderLeaseDurationUs = 2 [(ControlOptions) = { + Description: "The minimum leader lease duration in microseconds", + MinValue: 1000, + MaxValue: 5000000, + DefaultValue: 250000 }]; + } + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; + optional TCoordinatorControls CoordinatorControls = 3; }; message TMeteringConfig { diff --git a/ydb/core/protos/counters_coordinator.proto b/ydb/core/protos/counters_coordinator.proto index 28433e3e1a..f5437245bb 100644 --- a/ydb/core/protos/counters_coordinator.proto +++ b/ydb/core/protos/counters_coordinator.proto @@ -12,7 +12,7 @@ enum ESimpleCounters { } enum ECumulativeCounters { - COUNTER_CUMULATIVE_IGNORE = 0; + COUNTER_REQ_ACQUIRE_READ_STEP = 0 [(CounterOpts) = {Name: "AcquireReadStepRequests"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto index f0bec238e7..9e51d432da 100644 --- a/ydb/core/protos/tablet.proto +++ b/ydb/core/protos/tablet.proto @@ -60,6 +60,11 @@ message TTabletTypes { } } +message TTabletLogMetadata { + optional uint32 Key = 1; + optional bytes Data = 2; +} + message TTabletLogEntry { // normal log entries repeated uint32 DependsOn = 1; @@ -79,6 +84,7 @@ message TTabletLogEntry { repeated fixed64 ZeroTailBitmask = 12; optional bytes EmbeddedLogBody = 13; + repeated TTabletLogMetadata EmbeddedMetadata = 14; } message TTabletChannelInfo { @@ -238,3 +244,11 @@ message TEvTabletStop { optional uint64 TabletID = 1; optional EReason Reason = 2; } + +message TEvDropLease { + optional uint64 TabletID = 1; +} + +message TEvLeaseDropped { + optional uint64 TabletID = 1; +} diff --git a/ydb/core/tablet/tablet_req_rebuildhistory.cpp b/ydb/core/tablet/tablet_req_rebuildhistory.cpp index c0b4c0c2bc..620b25c1a8 100644 --- a/ydb/core/tablet/tablet_req_rebuildhistory.cpp +++ b/ydb/core/tablet/tablet_req_rebuildhistory.cpp @@ -44,6 +44,7 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil bool IsSnapshot; bool IsTotalSnapshot; TString EmbeddedLogBody; + TVector<TEvTablet::TCommitMetadata> EmbeddedMetadata; TVector<TLogoBlobID> GcDiscovered; TVector<TLogoBlobID> GcLeft; @@ -125,6 +126,14 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil GcLeft[i] = LogoBlobIDFromLogoBlobID(x.GetGcLeft(i)); } + if (const size_t metaSize = x.EmbeddedMetadataSize()) { + EmbeddedMetadata.reserve(metaSize); + for (size_t i = 0; i < metaSize; ++i) { + const auto& meta = x.GetEmbeddedMetadata(i); + EmbeddedMetadata.emplace_back(meta.GetKey(), meta.GetData()); + } + } + switch (Status) { case StatusUnknown: Status = StatusBody; @@ -679,9 +688,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil }()); if (entry.EmbeddedLogBody) - graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); else - graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); if (lastUnbrokenTailEntry + 1 == id.second) lastUnbrokenTailEntry = id.second; @@ -717,9 +726,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil }()); if (entry.EmbeddedLogBody) - graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); else - graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); hasSnapshotInGeneration |= entry.IsSnapshot; break; diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp index 3f3081f128..4537a4af91 100644 --- a/ydb/core/tablet/tablet_sys.cpp +++ b/ydb/core/tablet/tablet_sys.cpp @@ -1164,6 +1164,16 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) { entry->FollowerUpdate->Body = msg->EmbeddedLogBody; } + if (!msg->EmbeddedMetadata.empty()) { + auto *m = x->MutableEmbeddedMetadata(); + m->Reserve(msg->EmbeddedMetadata.size()); + for (const auto &meta : msg->EmbeddedMetadata) { + auto *p = m->Add(); + p->SetKey(meta.Key); + p->SetData(meta.Data); + } + } + if (saveFollowerUpdate && msg->FollowerAux) entry->FollowerUpdate->AuxPayload = msg->FollowerAux; diff --git a/ydb/core/tablet_flat/CMakeLists.txt b/ydb/core/tablet_flat/CMakeLists.txt index 93bdf75605..b5b4b43084 100644 --- a/ydb/core/tablet_flat/CMakeLists.txt +++ b/ydb/core/tablet_flat/CMakeLists.txt @@ -35,6 +35,7 @@ target_proto_messages(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor.proto ) target_sources(ydb-core-tablet_flat PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_boot_lease.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_boot_misc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp diff --git a/ydb/core/tablet_flat/flat_boot_cookie.h b/ydb/core/tablet_flat/flat_boot_cookie.h index 58e810325a..4ff737c2bd 100644 --- a/ydb/core/tablet_flat/flat_boot_cookie.h +++ b/ydb/core/tablet_flat/flat_boot_cookie.h @@ -68,6 +68,10 @@ namespace NBoot { const ui32 Raw = 0; /* only lower 24 bits are used */ }; + enum class ELogCommitMeta : ui32 { + LeaseInfo = 1, + }; + } } } diff --git a/ydb/core/tablet_flat/flat_boot_lease.cpp b/ydb/core/tablet_flat/flat_boot_lease.cpp new file mode 100644 index 0000000000..0fe6966b49 --- /dev/null +++ b/ydb/core/tablet_flat/flat_boot_lease.cpp @@ -0,0 +1,110 @@ +#include "flat_executor_bootlogic.h" +#include <ydb/core/util/pb.h> +#include <library/cpp/actors/interconnect/interconnect.h> + +namespace NKikimr { +namespace NTabletFlatExecutor { + +class TLeaseWaiter : public TActorBootstrapped<TLeaseWaiter> { +public: + TLeaseWaiter( + const TActorId& owner, + TMonotonic bootTimestamp, + const TActorId& leaseHolder, + TDuration leaseDuration) + : Owner(owner) + , BootTimestamp(bootTimestamp) + , LeaseHolder(leaseHolder) + , LeaseDuration(leaseDuration) + { } + + void Bootstrap() { + SendRequest(); + Schedule(BootTimestamp + LeaseDuration * 2, new TEvents::TEvWakeup); + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + sFunc(TEvents::TEvWakeup, HandleTimeout); + hFunc(TEvTablet::TEvLeaseDropped, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } + + void SendRequest() { + Send(LeaseHolder, new TEvTablet::TEvDropLease(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); + } + + void Finish() { + Send(Owner, new TEvTablet::TEvLeaseDropped()); + PassAway(); + } + + void HandleTimeout() { + Finish(); + } + + void Handle(TEvTablet::TEvLeaseDropped::TPtr&) { + Finish(); + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr&) { + // We don't need to do anything + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr&) { + // Keep retrying until timeout is reached + SendRequest(); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + auto* msg = ev->Get(); + Y_VERIFY(ev->Sender == LeaseHolder); + Y_VERIFY(msg->SourceType == TEvTablet::TEvDropLease::EventType); + if (msg->Reason == TEvents::TEvUndelivered::ReasonActorUnknown) { + // We have proved lease holder no longer exists + return Finish(); + } + + // We may get undelivered notifications due to disconnections + // Since we cannot trust them we expect to retry on TEvNodeDisconnected + } + +private: + const TActorId Owner; + const TMonotonic BootTimestamp; + const TActorId LeaseHolder; + const TDuration LeaseDuration; +}; + +void TExecutorBootLogic::StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept +{ + TActorId leaseHolder; + TDuration leaseDuration; + + for (const auto& entry : graph.Entries) { + for (const auto& meta : entry.EmbeddedMetadata) { + if (NBoot::ELogCommitMeta(meta.Key) == NBoot::ELogCommitMeta::LeaseInfo) { + TProtoBox<NKikimrExecutorFlat::TLeaseInfoMetadata> proto(meta.Data); + if (proto.HasLeaseHolder()) { + leaseHolder = ActorIdFromProto(proto.GetLeaseHolder()); + } + if (proto.HasLeaseDurationUs()) { + leaseDuration = TDuration::MicroSeconds(proto.GetLeaseDurationUs()); + } + } + } + } + + if (leaseHolder && leaseDuration) { + LeaseWaiter = Ops->RegisterWithSameMailbox(new TLeaseWaiter(SelfId, bootTimestamp, leaseHolder, leaseDuration)); + } +} + +} // namespace NTabletFlatExecutor +} // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_exec_commit.h b/ydb/core/tablet_flat/flat_exec_commit.h index 7212449c25..a1402dcf24 100644 --- a/ydb/core/tablet_flat/flat_exec_commit.h +++ b/ydb/core/tablet_flat/flat_exec_commit.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/base/logoblob.h> +#include <ydb/core/base/tablet.h> #include <util/generic/vector.h> #include <util/generic/string.h> @@ -53,6 +54,7 @@ namespace NTabletFlatExecutor { TString FollowerAux; TVector<TLogoBlob> Refs; TGCBlobDelta GcDelta; + TVector<TEvTablet::TCommitMetadata> Metadata; TSeat *FirstTx = nullptr; TSeat *LastTx = nullptr; }; diff --git a/ydb/core/tablet_flat/flat_exec_commit_mgr.h b/ydb/core/tablet_flat/flat_exec_commit_mgr.h index 20f5d5ffe1..caeabb014f 100644 --- a/ydb/core/tablet_flat/flat_exec_commit_mgr.h +++ b/ydb/core/tablet_flat/flat_exec_commit_mgr.h @@ -175,6 +175,7 @@ namespace NTabletFlatExecutor { ev->FollowerAux = std::move(commit.FollowerAux); ev->GcDiscovered = std::move(commit.GcDelta.Created); ev->GcLeft = std::move(commit.GcDelta.Deleted); + ev->EmbeddedMetadata = std::move(commit.Metadata); Ops->Send(Owner, ev, 0, ui64(commit.Type)); } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index e38dda8373..7f8c5550ad 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -408,6 +408,15 @@ void TExecutor::Active(const TActorContext &ctx) { CompactionLogic->UpdateInMemStatsStep(it.first, 0, Database->GetTableMemSize(it.first)); UpdateCompactions(); + + LeaseEnabled = Owner->ReadOnlyLeaseEnabled(); + if (LeaseEnabled) { + LeaseDuration = Owner->ReadOnlyLeaseDuration(); + if (!LeaseDuration) { + LeaseEnabled = false; + } + } + MakeLogSnapshot(); if (auto error = CheckBorrowConsistency()) { @@ -432,8 +441,6 @@ void TExecutor::TranscriptBootOpResult(ui32 res, const TActorContext &ctx) { case TExecutorBootLogic::OpResultComplete: return Active(ctx); case TExecutorBootLogic::OpResultBroken: - BootLogic.Destroy(); - if (auto logl = Logger->Log(ELnLev::Error)) { logl << NFmt::Do(*this) << " Broken while booting"; } @@ -453,10 +460,10 @@ void TExecutor::TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ct case TExecutorBootLogic::OpResultComplete: return ActivateFollower(ctx); case TExecutorBootLogic::OpResultBroken: - BootLogic.Destroy(); if (auto logl = Logger->Log(ELnLev::Error)) { logl << NFmt::Do(*this) << " Broken while follower booting"; } + return Broken(); default: Y_FAIL("unknown boot result"); @@ -588,6 +595,7 @@ TExecutorCaches TExecutor::CleanupState() { TExecutorCaches caches; if (BootLogic) { + BootLogic->Cancel(); caches = BootLogic->DetachCaches(); } else { if (PrivatePageCache) { @@ -1395,6 +1403,85 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } } +TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + Y_VERIFY(at >= LeaseEnd); + + if (!LeaseEnabled) { + // Automatically enable leases + LeaseEnabled = true; + LeaseDuration = Owner->ReadOnlyLeaseDuration(); + Y_VERIFY(LeaseDuration); + } + + // Try to find a suitable commit that is already in flight + TLeaseCommit* lease = nullptr; + for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) { + if (at < it->LeaseEnd) { + lease = &*it; + } else { + break; + } + } + + if (!lease) { + if (LeaseDropped) { + // We cannot start new lease confirmations + return nullptr; + } + + LogicRedo->FlushBatchedLog(); + + auto commit = CommitManager->Begin(true, ECommit::Misc); + + NKikimrExecutorFlat::TLeaseInfoMetadata proto; + ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); + proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); + + TString data; + bool ok = proto.SerializeToString(&data); + Y_VERIFY(ok); + + commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + + CommitManager->Commit(commit); + } + + return lease; +} + +void TExecutor::ConfirmReadOnlyLease(TMonotonic at) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + LeaseUsed = true; + + if (LeaseEnabled && at < LeaseEnd) { + return; + } + + EnsureReadOnlyLease(at); +} + +void TExecutor::ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + LeaseUsed = true; + + if (LeaseEnabled && at < LeaseEnd) { + callback(); + return; + } + + if (auto* lease = EnsureReadOnlyLease(at)) { + lease->Callbacks.push_back(std::move(callback)); + } +} + +void TExecutor::ConfirmReadOnlyLease(std::function<void()> callback) { + ConfirmReadOnlyLease(AppData()->MonotonicTimeProvider->Now(), std::move(callback)); +} + bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction; } @@ -2389,6 +2476,21 @@ void TExecutor::MakeLogSnapshot() { GcLogic->SnapToLog(snap, commit->Step); LogicSnap->MakeSnap(snap, *commit, Logger.Get()); + if (LeaseEnabled) { + NKikimrExecutorFlat::TLeaseInfoMetadata proto; + ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); + proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); + + TString data; + bool ok = proto.SerializeToString(&data); + Y_VERIFY(ok); + + commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + } + CommitManager->Commit(commit); CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage()); @@ -2605,6 +2707,41 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) { } } +void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx) { + TMonotonic ts = AppData(ctx)->MonotonicTimeProvider->Now(); + + LeaseDropped = true; + LeaseEnd = Min(LeaseEnd, ts); + + for (auto& l : LeaseCommits) { + l.LeaseEnd = Min(l.LeaseEnd, ts); + } + + ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped); + Owner->ReadOnlyLeaseDropped(); +} + +void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &) { + Y_VERIFY(LeaseExtendPending); + LeaseExtendPending = false; + + if (!LeaseCommits.empty() || !LeaseEnabled || LeaseDropped) { + return; + } + + if (LeaseUsed) { + LeaseUsed = false; + UnusedLeaseExtensions = 0; + } else if (UnusedLeaseExtensions >= 5) { + return; + } else { + ++UnusedLeaseExtensions; + } + + // Start a new lease extension commit + EnsureReadOnlyLease(LeaseEnd); +} + void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx) { TEvTablet::TEvCommitResult *msg = ev->Get(); @@ -2631,6 +2768,41 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext << " for step " << step; } + if (!LeaseCommits.empty()) { + auto& l = LeaseCommits.front(); + Y_VERIFY(step <= l.Step); + if (step == l.Step) { + LeasePersisted = true; + LeaseEnd = Max(LeaseEnd, l.LeaseEnd); + + while (!l.Callbacks.empty()) { + // Note: callback may (though unlikely) recursively add more callbacks + TVector<std::function<void()>> callbacks; + callbacks.swap(l.Callbacks); + for (auto& callback : callbacks) { + callback(); + } + } + + LeaseCommits.pop_front(); + + // Calculate a full round-trip latency for leases + // When this latency is larger than third of lease duration we want + // to increase lease duration so we would have enough time for + // processing read-only requests without additional commits + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + if ((LeaseEnd - ts) < LeaseDuration / 3) { + LeaseDuration *= 2; + } + + // We want to schedule a new commit before the lease expires + if (LeaseCommits.empty() && !LeaseExtendPending) { + Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend); + LeaseExtendPending = true; + } + } + } + switch (cookie) { case ECommit::Redo: { @@ -3562,11 +3734,13 @@ STFUNC(TExecutor::StateWork) { CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters); cFunc(TEvPrivate::EvCheckYellow, UpdateYellow); cFunc(TEvPrivate::EvUpdateCompactions, UpdateCompactions); + HFunc(TEvPrivate::TEvLeaseExtend, Handle); HFunc(TEvents::TEvWakeup, Wakeup); hFunc(TEvents::TEvFlushLog, Handle); hFunc(NSharedCache::TEvRequest, Handle); hFunc(NSharedCache::TEvResult, Handle); hFunc(NSharedCache::TEvUpdated, Handle); + HFunc(TEvTablet::TEvDropLease, Handle); HFunc(TEvTablet::TEvCommitResult, Handle); hFunc(TEvTablet::TEvCheckBlobstorageStatusResult, Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index df08b3a338..3a61e5385d 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -334,6 +334,7 @@ class TExecutor EvActivateCompactionRead, EvActivateCompactionChanges, EvBrokenTransaction, + EvLeaseExtend, EvEnd }; @@ -347,6 +348,7 @@ class TExecutor struct TEvActivateCompactionRead : public TEventLocal<TEvActivateCompactionRead, EvActivateCompactionRead> {}; struct TEvActivateCompactionChanges : public TEventLocal<TEvActivateCompactionChanges, EvActivateCompactionChanges> {}; struct TEvBrokenTransaction : public TEventLocal<TEvBrokenTransaction, EvBrokenTransaction> {}; + struct TEvLeaseExtend : public TEventLocal<TEvLeaseExtend, EvLeaseExtend> {}; }; const TIntrusivePtr<ITimeProvider> Time = nullptr; @@ -356,6 +358,37 @@ class TExecutor ui32 FollowerId = 0; + // This becomes true when executor enables the use of leases, e.g. starts persisting them + // This may become false again when leases are not actively used for some time + bool LeaseEnabled = false; + // As soon as lease is persisted we may theoretically use read-only checks for lease prolongation + bool LeasePersisted = false; + // When lease is dropped we must stop accepting new lease-dependent requests + bool LeaseDropped = false; + // When lease is used in any given cycle this becomes true + bool LeaseUsed = false; + // This flag marks when TEvLeaseExtend message is already pending + bool LeaseExtendPending = false; + TDuration LeaseDuration; + TMonotonic LeaseEnd; + // Counts the number of times an unused lease has been extended + size_t UnusedLeaseExtensions = 0; + + struct TLeaseCommit { + const ui32 Step; + const TMonotonic Start; + TMonotonic LeaseEnd; + TVector<std::function<void()>> Callbacks; + + TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd) + : Step(step) + , Start(start) + , LeaseEnd(leaseEnd) + { } + }; + + TList<TLeaseCommit> LeaseCommits; + using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>; THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue; THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> PendingQueue; @@ -502,6 +535,8 @@ class TExecutor void ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch); void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx); + void Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvLeaseExtend::TPtr &ev, const TActorContext &ctx); void Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx); @@ -575,6 +610,11 @@ public: void DetachTablet(const TActorContext &ctx) override; void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override; + TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); + void ConfirmReadOnlyLease(TMonotonic at) override; + void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override; + void ConfirmReadOnlyLease(std::function<void()> callback) override; + TString BorrowSnapshot(ui32 tableId, const TTableSnapshotContext& snap, TRawVals from, TRawVals to, ui64 loaner) const override; ui64 MakeScanSnapshot(ui32 table) override; diff --git a/ydb/core/tablet_flat/flat_executor.proto b/ydb/core/tablet_flat/flat_executor.proto index 7758812187..0b865d16df 100644 --- a/ydb/core/tablet_flat/flat_executor.proto +++ b/ydb/core/tablet_flat/flat_executor.proto @@ -1,6 +1,7 @@ import "ydb/core/protos/base.proto"; import "ydb/core/protos/tablet.proto"; import "ydb/core/protos/flat_scheme_op.proto"; +import "library/cpp/actors/protos/actors.proto"; package NKikimrExecutorFlat; option java_package = "ru.yandex.kikimr.proto"; @@ -247,3 +248,13 @@ message TDatabaseBorrowPart { repeated TPartInfo Parts = 3; repeated TTxStatus TxStatusParts = 4; } + +message TLeaseInfoMetadata { + // Actor id of the lease holder, which is an executor actor + optional NActorsProto.TActorId LeaseHolder = 1; + + // Lease duration in microseconds. Lease holder may perform leader-only + // tasks up to this duration since the last channel 0 block confirmation, + // and expects future generations will not interfere. + optional uint32 LeaseDurationUs = 2; +} diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp index cf72960f9c..4292a61d3c 100644 --- a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp @@ -103,9 +103,14 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot( TEvTablet::TEvBoot::TPtr &ev, TExecutorCaches &&caches) { - PrepareEnv(false, ev->Get()->Generation, std::move(caches)); + TEvTablet::TEvBoot *msg = ev->Get(); + PrepareEnv(false, msg->Generation, std::move(caches)); - Steps->Spawn<NBoot::TStages>(std::move(ev->Get()->DependencyGraph), nullptr); + if (msg->DependencyGraph) { + StartLeaseWaiter(BootTimestamp, *msg->DependencyGraph); + } + + Steps->Spawn<NBoot::TStages>(std::move(msg->DependencyGraph), nullptr); Steps->Execute(); return CheckCompletion(); @@ -113,7 +118,7 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot( void TExecutorBootLogic::PrepareEnv(bool follower, ui32 gen, TExecutorCaches caches) noexcept { - BootStartTime = TAppData::TimeProvider->Now(); + BootTimestamp = AppData()->MonotonicTimeProvider->Now(); auto *sys = TlsActivationContext->ExecutorThread.ActorSystem; auto *logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_FLATBOOT); @@ -212,9 +217,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::CheckCompletion() if (Loads) return OpResultContinue; + if (LeaseWaiter) + return OpResultContinue; + if (State().Follower || Restored) { if (auto logl = Steps->Logger()->Log(ELnLev::Info)) { - auto spent = TAppData::TimeProvider->Now() - BootStartTime; + auto spent = AppData()->MonotonicTimeProvider->Now() - BootTimestamp; logl << NFmt::Do(State()) << " booting completed" @@ -278,6 +286,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl step.Drop(); Steps->Execute(); + } else if (auto *msg = ev.CastAsLocal<TEvTablet::TEvLeaseDropped>()) { + if (LeaseWaiter != ev.Sender) { + return OpResultUnhandled; + } + + LeaseWaiter = { }; } else { return OpResultUnhandled; } @@ -291,6 +305,9 @@ TAutoPtr<NBoot::TResult> TExecutorBootLogic::ExtractState() noexcept { } void TExecutorBootLogic::Cancel() { + if (LeaseWaiter) { + Ops->Send(LeaseWaiter, new TEvents::TEvPoison); + } } void TExecutorBootLogic::FollowersSyncComplete() { diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.h b/ydb/core/tablet_flat/flat_executor_bootlogic.h index 34edb2d6b4..56084c36b0 100644 --- a/ydb/core/tablet_flat/flat_executor_bootlogic.h +++ b/ydb/core/tablet_flat/flat_executor_bootlogic.h @@ -74,8 +74,9 @@ private: TAutoPtr<NBoot::TBack> State_; TAutoPtr<NBoot::TResult> Result_; TAutoPtr<NBoot::TRoot> Steps; + TActorId LeaseWaiter; - TInstant BootStartTime; + TMonotonic BootTimestamp; const TIntrusiveConstPtr<TTabletStorageInfo> Info; @@ -91,6 +92,7 @@ private: EOpResult CheckCompletion(); void PrepareEnv(bool follower, ui32 generation, TExecutorCaches caches) noexcept; + void StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept; ui32 GetBSGroupFor(const TLogoBlobID &logo) const; ui32 GetBSGroupID(ui32 channel, ui32 generation); void LoadEntry(TIntrusivePtr<NBoot::TLoadBlobs>); diff --git a/ydb/core/tablet_flat/flat_executor_leases_ut.cpp b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp new file mode 100644 index 0000000000..3761f4fcd0 --- /dev/null +++ b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp @@ -0,0 +1,365 @@ +#include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/base/statestorage.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NTabletFlatExecutor { + +Y_UNIT_TEST_SUITE(TFlatExecutorLeases) { + + struct TEvLeasesTablet { + enum EEv { + EvWrite = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvWriteAck, + EvRead, + EvReadResult, + }; + + struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> { + TEvWrite(ui64 key, ui64 value) + : Key(key) + , Value(value) + { } + + const ui64 Key; + const ui64 Value; + }; + + struct TEvWriteAck : public TEventLocal<TEvWriteAck, EvWriteAck> { + TEvWriteAck(ui64 key) + : Key(key) + { } + + const ui64 Key; + }; + + struct TEvRead : public TEventLocal<TEvRead, EvRead> { + TEvRead(ui64 key) + : Key(key) + { } + + const ui64 Key; + }; + + struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> { + TEvReadResult(ui64 key, ui64 value) + : Key(key) + , Value(value) + { } + + const ui64 Key; + const ui64 Value; + }; + }; + + class TLeasesTablet + : public TActor<TLeasesTablet> + , public TTabletExecutedFlat + { + public: + TLeasesTablet(const TActorId& tablet, TTabletStorageInfo* info, bool enableInitialLease) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, nullptr) + , EnableInitialLease(enableInitialLease) + { } + + private: + struct Schema : NIceDb::Schema { + struct Data : Table<1> { + struct Key : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Value : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<Key>; + using TColumns = TableColumns<Key, Value>; + }; + + using TTables = SchemaTables<Data>; + }; + + using TTxBase = TTransactionBase<TLeasesTablet>; + + struct TTxInitSchema : public TTxBase { + TTxInitSchema(TSelf* self) + : TTxBase(self) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + NIceDb::TNiceDb db(txc.DB); + db.Materialize<Schema>(); + Self->RunTxInit(); + return true; + } + + void Complete(const TActorContext&) { + // nothing + } + }; + + void RunTxInitSchema() { + Execute(new TTxInitSchema(this)); + } + + struct TTxInit : public TTxBase { + TTxInit(TSelf* self) + : TTxBase(self) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + NIceDb::TNiceDb db(txc.DB); + + auto rowset = db.Table<Schema::Data>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + ui64 key = rowset.GetValue<Schema::Data::Key>(); + ui64 value = rowset.GetValue<Schema::Data::Value>(); + Self->Data[key] = value; + if (!rowset.Next()) { + return false; + } + } + + return true; + } + + void Complete(const TActorContext& ctx) { + Self->SwitchToWork(ctx); + } + }; + + void RunTxInit() { + Execute(new TTxInit(this)); + } + + struct TTxWrite : public TTxBase { + TTxWrite(TSelf* self, TEvLeasesTablet::TEvWrite::TPtr&& ev) + : TTxBase(self) + , Ev(std::move(ev)) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + auto* msg = Ev->Get(); + + NIceDb::TNiceDb db(txc.DB); + + db.Table<Schema::Data>().Key(msg->Key).Update( + NIceDb::TUpdate<Schema::Data::Value>(msg->Value)); + return true; + } + + void Complete(const TActorContext& ctx) { + auto* msg = Ev->Get(); + Self->Data[msg->Key] = msg->Value; + + ctx.Send(Ev->Sender, new TEvLeasesTablet::TEvWriteAck(msg->Key), 0, Ev->Cookie); + } + + const TEvLeasesTablet::TEvWrite::TPtr Ev; + }; + + void Handle(TEvLeasesTablet::TEvWrite::TPtr& ev) { + Execute(new TTxWrite(this, std::move(ev))); + } + + void Handle(TEvLeasesTablet::TEvRead::TPtr& ev) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto key = ev->Get()->Key; + auto value = Data[key]; + Executor()->ConfirmReadOnlyLease([this, sender, cookie, key, value]() { + Send(sender, new TEvLeasesTablet::TEvReadResult(key, value), 0, cookie); + }); + } + + private: + void OnDetach(const TActorContext& ctx) override { + Die(ctx); + } + + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext& ctx) override { + Die(ctx); + } + + void OnActivateExecutor(const TActorContext&) override { + Become(&TThis::StateWork); + RunTxInitSchema(); + } + + void DefaultSignalTabletActive(const TActorContext&) override { + // nothing + } + + void SwitchToWork(const TActorContext& ctx) { + SignalTabletActive(ctx); + } + + bool ReadOnlyLeaseEnabled() override { + return EnableInitialLease; + } + + private: + STFUNC(StateInit) { + StateInitImpl(ev, ctx); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvLeasesTablet::TEvWrite, Handle); + hFunc(TEvLeasesTablet::TEvRead, Handle); + default: + HandleDefaultEvents(ev, ctx); + } + } + + private: + const bool EnableInitialLease; + THashMap<ui64, ui64> Data; + }; + + void DoBasics(bool deliverDropLease, bool enableInitialLease) { + TTestBasicRuntime runtime(2); + SetupTabletServices(runtime); + runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + + const ui64 tabletId = TTestTxConfig::TxTablet0; + + auto boot1 = CreateTestBootstrapper(runtime, + CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY), + [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) { + return new TLeasesTablet(tablet, info, enableInitialLease); + }); + runtime.EnableScheduleForActor(boot1); + + { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1)); + runtime.DispatchEvents(options); + } + + auto sender = runtime.AllocateEdgeActor(0); + auto pipe1 = runtime.ConnectToPipe(tabletId, sender, 0, NTabletPipe::TClientRetryPolicy::WithRetries()); + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvWrite(1, 11)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvWriteAck>(sender); + } + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + bool blockDropLease = true; + TVector<THolder<IEventHandle>> dropLeaseMsgs; + auto observerFunc = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTablet::TEvDropLease::EventType: + if (blockDropLease) { + dropLeaseMsgs.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + case TEvTablet::TEvTabletDead::EventType: + // Prevent tablets from restarting + // This is most important for the boot1 actor, since it + // quickly receives bad commit signal and tries to restart + // the original tablet. However we prevent executor from + // killing itself too, so we could make additional queries. + return TTestActorRuntime::EEventAction::DROP; + case TEvTablet::TEvDemoted::EventType: + // Block guardian from telling tablet about a new generation + return TTestActorRuntime::EEventAction::DROP; + case TEvStateStorage::TEvReplicaLeaderDemoted::EventType: + // Block replica from telling tablet about a new generation + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(observerFunc); + + auto boot2 = CreateTestBootstrapper(runtime, + CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY), + [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) { + return new TLeasesTablet(tablet, info, enableInitialLease); + }, + /* node index */ 1); + runtime.EnableScheduleForActor(boot2); + + { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1)); + runtime.DispatchEvents(options); + } + + // After the new tablet is restored, we should still be able to read from the old tablet for a while + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return dropLeaseMsgs.size() == 1; }, "drop lease message"); + + // We expect the new tablet to send a drop lease message, but the old tablet must still be readable, because it didn't receive it yet + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + if (deliverDropLease) { + blockDropLease = false; + for (auto& ev : dropLeaseMsgs) { + runtime.Send(ev.Release(), 0, true); + } + dropLeaseMsgs.clear(); + } + + auto sender2 = runtime.AllocateEdgeActor(1); + auto pipe2 = runtime.ConnectToPipe(tabletId, sender2, 1, NTabletPipe::TClientRetryPolicy::WithRetries()); + runtime.SendToPipe(pipe2, sender2, new TEvLeasesTablet::TEvRead(1), 1); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender2); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender, TDuration::Seconds(1)); + UNIT_ASSERT(!ev); + } + } + + Y_UNIT_TEST(Basics) { + DoBasics(true, false); + } + + Y_UNIT_TEST(BasicsLeaseTimeout) { + DoBasics(false, false); + } + + Y_UNIT_TEST(BasicsInitialLease) { + DoBasics(true, true); + } + + Y_UNIT_TEST(BasicsInitialLeaseTimeout) { + DoBasics(false, true); + } +} + +} // namespace NKikimr::NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp index f69991ece9..258addbf7f 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp @@ -44,6 +44,18 @@ namespace NFlatExecutorSetup { if (launcherID) LauncherActorID = launcherID; } + + bool ITablet::ReadOnlyLeaseEnabled() { + return false; + } + + TDuration ITablet::ReadOnlyLeaseDuration() { + return TDuration::MilliSeconds(250); + } + + void ITablet::ReadOnlyLeaseDropped() { + // nothing by default + } } }} diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 075ebce354..babe00d863 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -452,6 +452,10 @@ namespace NFlatExecutorSetup { virtual void OnLeaderUserAuxUpdate(TString) { /* default */ } + virtual bool ReadOnlyLeaseEnabled(); + virtual TDuration ReadOnlyLeaseDuration(); + virtual void ReadOnlyLeaseDropped(); + // create transaction? protected: ITablet(TTabletStorageInfo *info, const TActorId &tablet) @@ -496,6 +500,10 @@ namespace NFlatExecutorSetup { virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0; + virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; + virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0; + virtual void ConfirmReadOnlyLease(std::function<void()> callback) = 0; + /* Make blob with data required for table bootstapping. Note: 1. Once non-trivial blob obtained and commited in tx all of its borrowed bundles have to be eventually released (see db). diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt index 720279b57a..c5146c6184 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tablet_flat-ut PUBLIC ydb-core-scheme test-libs-exec test-libs-table + ydb-core-testlib udf-service-exception_policy ) target_sources(ydb-core-tablet_flat-ut PRIVATE @@ -32,6 +33,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_database_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_leases_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_sausagecache_ut.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt index dd2fe3b1bb..fff5984997 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tablet_flat-ut PUBLIC ydb-core-scheme test-libs-exec test-libs-table + ydb-core-testlib udf-service-exception_policy ) target_sources(ydb-core-tablet_flat-ut PRIVATE @@ -33,6 +34,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_database_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_leases_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_sausagecache_ut.cpp diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 9b0c97c731..58d4d05cd3 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -207,9 +207,6 @@ namespace Tests { const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode; SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams); - CreateBootstrapTablets(); - SetupStorage(); - for (ui32 nodeIdx = 0; nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) { SetupDomainLocalService(nodeIdx); Runtime->GetAppData(nodeIdx).AuthConfig.MergeFrom(Settings->AuthConfig); @@ -223,6 +220,9 @@ namespace Tests { SetupConfigurators(nodeIdx); SetupProxies(nodeIdx); } + + CreateBootstrapTablets(); + SetupStorage(); } void TServer::SetupMessageBus(ui16 port, const TString &tracePath) { diff --git a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp index 8345d93485..e6a36f0007 100644 --- a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp +++ b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp @@ -96,11 +96,27 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE TEvAcquireReadStep"); + IncCounter(COUNTER_REQ_ACQUIRE_READ_STEP); + if (Y_UNLIKELY(Stopping)) { // We won't be able to commit anyway return; } + if (ReadOnlyLeaseEnabled()) { + // We acquire read step using a read-only lease from executor + // It is guaranteed that any future generation was not running at + // the time ConfirmReadOnlyLease was called. + TActorId sender = ev->Sender; + ui64 cookie = ev->Cookie; + ui64 step = Max(VolatileState.LastSentStep, VolatileState.LastAcquired); + VolatileState.LastAcquired = step; + Executor()->ConfirmReadOnlyLease([this, sender, cookie, step]() { + Send(sender, new TEvTxProxy::TEvAcquireReadStepResult(TabletID(), step), 0, cookie); + }); + return; + } + VolatileState.AcquireReadStepPending.emplace_back(ev->Sender, ev->Cookie); MaybeFlushAcquireReadStep(ctx); } diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp index 1fbc46f449..49aa706c77 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.cpp +++ b/ydb/core/tx/coordinator/coordinator_impl.cpp @@ -50,6 +50,8 @@ const ui32 TTxCoordinator::Schema::CurrentVersion = 1; TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) + , EnableLeaderLeases(0, 0, 1) + , MinLeaderLeaseDurationUs(250000, 1000, 5000000) #ifdef COORDINATOR_LOG_TO_FILE , DebugName(Sprintf("/tmp/coordinator_db_log_%" PRIu64 ".%" PRIi32 ".%" PRIu64 ".gz", TabletID(), getpid(), tablet.LocalId())) , DebugLogFile(DebugName) @@ -316,7 +318,28 @@ void TTxCoordinator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, cons } } +void TTxCoordinator::IcbRegister() { + if (!IcbRegistered) { + AppData()->Icb->RegisterSharedControl(EnableLeaderLeases, "CoordinatorControls.EnableLeaderLeases"); + AppData()->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "CoordinatorControls.MinLeaderLeaseDurationUs"); + IcbRegistered = true; + } +} + +bool TTxCoordinator::ReadOnlyLeaseEnabled() { + IcbRegister(); + ui64 value = EnableLeaderLeases; + return value != 0; +} + +TDuration TTxCoordinator::ReadOnlyLeaseDuration() { + IcbRegister(); + ui64 value = MinLeaderLeaseDurationUs; + return TDuration::MicroSeconds(value); +} + void TTxCoordinator::OnActivateExecutor(const TActorContext &ctx) { + IcbRegister(); TryInitMonCounters(ctx); Executor()->RegisterExternalTabletCounters(TabletCountersPtr); Execute(CreateTxSchema(), ctx); diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h index 28a817ae46..82f5f4b4d2 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.h +++ b/ydb/core/tx/coordinator/coordinator_impl.h @@ -8,6 +8,8 @@ #include <library/cpp/actors/helpers/mon_histogram_helper.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/tx_processing.h> +#include <ydb/core/control/immediate_control_board_wrapper.h> +#include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_exception.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -461,6 +463,10 @@ private: i64 CurrentTxInFly; }; + bool IcbRegistered = false; + TControlWrapper EnableLeaderLeases; + TControlWrapper MinLeaderLeaseDurationUs; + TVolatileState VolatileState; TConfig Config; TCoordinatorMonCounters MonCounters; @@ -503,6 +509,14 @@ private: return TActor::Die(ctx); } + void IcbRegister(); + bool ReadOnlyLeaseEnabled() override; + TDuration ReadOnlyLeaseDuration() override; + + void IncCounter(NFlatTxCoordinator::ECumulativeCounters counter, ui64 num = 1) { + TabletCounters->Cumulative()[counter].Increment(num); + } + void OnActivateExecutor(const TActorContext &ctx) override; void DefaultSignalTabletActive(const TActorContext &ctx) override { |