diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-04-18 18:40:28 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-18 18:40:28 +0300 |
commit | 9ada39f7f8bf02130d705ad98476449e5ff29443 (patch) | |
tree | 5f685fdb250fe15b40eec5b95a3817f2957d3f12 | |
parent | 86304e375567b039455008cdc9b12cfc5f5c66de (diff) | |
download | ydb-9ada39f7f8bf02130d705ad98476449e5ff29443.tar.gz |
22-2: Snapshot isolation with prioritized reads, KIKIMR-13910
Merge from trunk: r9171244, r9229795, r9244174, r9285534, r9300998, r9313872
REVIEW: 2435130
x-ydb-stable-ref: 9c878f6e7949737ee83323c98c1b7c64c372710e
72 files changed, 2941 insertions, 166 deletions
diff --git a/library/cpp/actors/core/monotonic_provider.cpp b/library/cpp/actors/core/monotonic_provider.cpp new file mode 100644 index 0000000000..fb3b656da6 --- /dev/null +++ b/library/cpp/actors/core/monotonic_provider.cpp @@ -0,0 +1,16 @@ +#include "monotonic_provider.h" + +namespace NActors { + +class TDefaultMonotonicTimeProvider : public IMonotonicTimeProvider { +public: + TMonotonic Now() override { + return TMonotonic::Now(); + } +}; + +TIntrusivePtr<IMonotonicTimeProvider> CreateDefaultMonotonicTimeProvider() { + return TIntrusivePtr<IMonotonicTimeProvider>(new TDefaultMonotonicTimeProvider); +} + +} // namespace NActors diff --git a/library/cpp/actors/core/monotonic_provider.h b/library/cpp/actors/core/monotonic_provider.h new file mode 100644 index 0000000000..98e1203400 --- /dev/null +++ b/library/cpp/actors/core/monotonic_provider.h @@ -0,0 +1,14 @@ +#pragma once + +#include "monotonic.h" + +namespace NActors { + +class IMonotonicTimeProvider : public TThrRefBase { +public: + virtual TMonotonic Now() = 0; +}; + +TIntrusivePtr<IMonotonicTimeProvider> CreateDefaultMonotonicTimeProvider(); + +} // namespace NActors diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 880a9d00db..2979020d68 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -83,6 +83,8 @@ SRCS( mon_stats.h monotonic.cpp monotonic.h + monotonic_provider.cpp + monotonic_provider.h worker_context.cpp worker_context.h probes.cpp diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 6fa25b9965..51d93ba6e9 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -233,6 +233,20 @@ namespace NActors { TTestActorRuntimeBase& Runtime; }; + class TTestActorRuntimeBase::TMonotonicTimeProvider : public IMonotonicTimeProvider { + public: + TMonotonicTimeProvider(TTestActorRuntimeBase& runtime) + : Runtime(runtime) + { } + + TMonotonic Now() override { + return Runtime.GetCurrentMonotonicTime(); + } + + private: + TTestActorRuntimeBase& Runtime; + }; + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { public: TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) @@ -470,6 +484,7 @@ namespace NActors { , NeedMonitoring(false) , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) , TimeProvider(new TTimeProvider(*this)) + , MonotonicTimeProvider(new TMonotonicTimeProvider(*this)) , ShouldContinue() , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) @@ -797,6 +812,12 @@ namespace NActors { return TInstant::MicroSeconds(CurrentTimestamp); } + TMonotonic TTestActorRuntimeBase::GetCurrentMonotonicTime() const { + TGuard<TMutex> guard(Mutex); + Y_VERIFY(!UseRealThreads); + return TMonotonic::MicroSeconds(CurrentTimestamp); + } + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; @@ -823,6 +844,11 @@ namespace NActors { return TimeProvider; } + TIntrusivePtr<IMonotonicTimeProvider> TTestActorRuntimeBase::GetMonotonicTimeProvider() { + Y_VERIFY(!UseRealThreads); + return MonotonicTimeProvider; + } + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); return FirstNodeId + index; diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 26e3b45c98..c14cde1b66 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/executor_thread.h> #include <library/cpp/actors/core/mailbox.h> +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/actors/interconnect/poller_tcp.h> #include <library/cpp/actors/interconnect/mock/ic_mock.h> @@ -188,6 +189,7 @@ namespace NActors { class TSchedulerThreadStub; class TExecutorPoolStub; class TTimeProvider; + class TMonotonicTimeProvider; enum class EEventAction { PROCESS, @@ -229,7 +231,9 @@ namespace NActors { void SetLogBackend(const TAutoPtr<TLogBackend> logBackend); void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); TIntrusivePtr<ITimeProvider> GetTimeProvider(); + TIntrusivePtr<IMonotonicTimeProvider> GetMonotonicTimeProvider(); TInstant GetCurrentTime() const; + TMonotonic GetCurrentMonotonicTime() const; void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); @@ -534,6 +538,7 @@ namespace NActors { TIntrusivePtr<IRandomProvider> RandomProvider; TIntrusivePtr<ITimeProvider> TimeProvider; + TIntrusivePtr<IMonotonicTimeProvider> MonotonicTimeProvider; protected: struct TNodeDataBase: public TThrRefBase { diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp index f9e517fc42..b2b77e9e88 100644 --- a/ydb/core/base/appdata.cpp +++ b/ydb/core/base/appdata.cpp @@ -19,6 +19,7 @@ TAppData::TAppData( , TypeRegistry(typeRegistry) , FunctionRegistry(functionRegistry) , FormatFactory(formatFactory) + , MonotonicTimeProvider(CreateDefaultMonotonicTimeProvider()) , ProxySchemeCacheNodes(Max<ui64>() / 4) , ProxySchemeCacheDistrNodes(Max<ui64>() / 4) , CompilerSchemeCachePaths(Max<ui64>() / 4) diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index 0a91d2560e..d6c0075e62 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -21,6 +21,7 @@ #include <library/cpp/actors/interconnect/poller_tcp.h> #include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/random_provider/random_provider.h> #include <library/cpp/time_provider/time_provider.h> @@ -116,6 +117,7 @@ struct TAppData { static TIntrusivePtr<IRandomProvider> RandomProvider; static TIntrusivePtr<ITimeProvider> TimeProvider; + TIntrusivePtr<IMonotonicTimeProvider> MonotonicTimeProvider; TIntrusivePtr<TDomainsInfo> DomainsInfo; TIntrusivePtr<TChannelProfiles> ChannelProfiles; TIntrusivePtr<TDynamicNameserviceConfig> DynamicNameserviceConfig; diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h index 602e39c600..3b97f4fe09 100644 --- a/ydb/core/base/tablet.h +++ b/ydb/core/base/tablet.h @@ -52,6 +52,7 @@ struct TEvTablet { EvFollowerSyncComplete, // from leader to user tablet when all old followers are touched and synced EvCutTabletHistory, EvUpdateConfig, + EvDropLease, EvCommit = EvBoot + 512, EvAux, @@ -60,6 +61,7 @@ struct TEvTablet { EvTabletActive, EvPromoteToLeader, EvFGcAck, // from user tablet to follower + EvLeaseDropped, EvTabletDead = EvBoot + 1024, EvFollowerUpdateState, // notifications to guardian @@ -101,6 +103,16 @@ struct TEvTablet { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET)"); + struct TCommitMetadata { + ui32 Key; + TString Data; + + TCommitMetadata(ui32 key, TString data) + : Key(key) + , Data(std::move(data)) + { } + }; + struct TDependencyGraph : public TThrRefBase { struct TEntry { std::pair<ui32, ui32> Id; @@ -111,28 +123,29 @@ struct TEvTablet { TVector<TLogoBlobID> GcLeft; TString EmbeddedLogBody; - - TEntry() - : IsSnapshot(false) - {} - - void Set(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &refs, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Id = id; - References.swap(refs); - IsSnapshot = isSnapshot; - GcDiscovered.swap(gcDiscovered); - GcLeft.swap(gcLeft); - EmbeddedLogBody.clear(); - } - - void Set(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Id = id; - References.clear(); - IsSnapshot = false; - GcDiscovered.swap(gcDiscovered); - GcLeft.swap(gcLeft); - EmbeddedLogBody = embeddedLogBody; - } + TVector<TCommitMetadata> EmbeddedMetadata; + + TEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + : Id(id) + , IsSnapshot(isSnapshot) + , References(std::move(refs)) + , GcDiscovered(std::move(gcDiscovered)) + , GcLeft(std::move(gcLeft)) + , EmbeddedMetadata(std::move(metadata)) + { } + + TEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + : Id(id) + , IsSnapshot(false) + , GcDiscovered(std::move(gcDiscovered)) + , GcLeft(std::move(gcLeft)) + , EmbeddedLogBody(std::move(embeddedLogBody)) + , EmbeddedMetadata(std::move(metadata)) + { } }; std::pair<ui32, ui32> Snapshot; @@ -148,19 +161,23 @@ struct TEvTablet { << " entries " << Entries.size() << "}"; } - void AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &references, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { + TEntry& AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + { if (isSnapshot) { Snapshot = id; Entries.clear(); } - Entries.push_back(TEntry()); - Entries.back().Set(id, references, isSnapshot, gcDiscovered, gcLeft); + return Entries.emplace_back(id, std::move(refs), isSnapshot, std::move(gcDiscovered), std::move(gcLeft), std::move(metadata)); } - void AddEntry(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) { - Entries.push_back(TEntry()); - Entries.back().Set(id, embeddedLogBody, gcDiscovered, gcLeft); + TEntry& AddEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody, + TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft, + TVector<TCommitMetadata> &&metadata) + { + return Entries.emplace_back(id, std::move(embeddedLogBody), std::move(gcDiscovered), std::move(gcLeft), std::move(metadata)); } void Invalidate() { @@ -269,6 +286,8 @@ struct TEvTablet { TString EmbeddedLogBody; TString FollowerAux; + TVector<TCommitMetadata> EmbeddedMetadata; + TEvCommit(ui64 tabletId, ui32 gen, ui32 step, const TVector<ui32> &dependsOn, bool isSnapshot , bool preCommited = false , TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticMinLatency) @@ -777,6 +796,22 @@ struct TEvTablet { }; struct TEvTabletStopped : TEventLocal<TEvTabletStopped, EvTabletStopped> {}; + + struct TEvDropLease : TEventPB<TEvDropLease, NKikimrTabletBase::TEvDropLease, EvDropLease> { + TEvDropLease() = default; + + explicit TEvDropLease(ui64 tabletId) { + Record.SetTabletID(tabletId); + } + }; + + struct TEvLeaseDropped : TEventPB<TEvLeaseDropped, NKikimrTabletBase::TEvLeaseDropped, EvLeaseDropped> { + TEvLeaseDropped() = default; + + explicit TEvLeaseDropped(ui64 tabletId) { + Record.SetTabletID(tabletId); + } + }; }; IActor* CreateTabletKiller(ui64 tabletId, ui32 nodeId = 0, ui32 maxGeneration = Max<ui32>()); diff --git a/ydb/core/cms/console/immediate_controls_configurator.cpp b/ydb/core/cms/console/immediate_controls_configurator.cpp index af3ad24217..4f959a1891 100644 --- a/ydb/core/cms/console/immediate_controls_configurator.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator.cpp @@ -18,7 +18,8 @@ public: } TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg); + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls); void Bootstrap(const TActorContext &ctx); @@ -38,13 +39,15 @@ public: } private: - void CreateControls(TIntrusivePtr<TControlBoard> board); + void CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting); void CreateControls(TIntrusivePtr<TControlBoard> board, const google::protobuf::Descriptor *desc, - const TString &prefix); + const TString &prefix, + bool allowExisting); void AddControl(TIntrusivePtr<TControlBoard> board, const google::protobuf::FieldDescriptor *desc, - const TString &prefix); + const TString &prefix, + bool allowExisting); void ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg, TIntrusivePtr<TControlBoard> board); void ApplyConfig(const ::google::protobuf::Message &cfg, @@ -57,9 +60,10 @@ private: }; TImmediateControlsConfigurator::TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg) + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls) { - CreateControls(board); + CreateControls(board, allowExistingControls); ApplyConfig(cfg, board); } @@ -98,15 +102,16 @@ void TImmediateControlsConfigurator::Handle(TEvConsole::TEvConfigNotificationReq ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie); } -void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board) +void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting) { auto *desc = NKikimrConfig::TImmediateControlsConfig::descriptor(); - CreateControls(board, desc, ""); + CreateControls(board, desc, "", allowExisting); } void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, const google::protobuf::Descriptor *desc, - const TString &prefix) + const TString &prefix, + bool allowExisting) { for (int i = 0; i < desc->field_count(); ++i) { auto *fieldDesc = desc->field(i); @@ -117,20 +122,22 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> auto fieldType = fieldDesc->type(); if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64 || fieldType == google::protobuf::FieldDescriptor::TYPE_INT64) - AddControl(board, fieldDesc, prefix); + AddControl(board, fieldDesc, prefix, allowExisting); else { Y_VERIFY(fieldType == google::protobuf::FieldDescriptor::TYPE_MESSAGE, "Only [u]int64 and message fields are allowed in Immediate Controls Config"); CreateControls(board, fieldDesc->message_type(), - MakePrefix(prefix, fieldDesc->name())); + MakePrefix(prefix, fieldDesc->name()), + allowExisting); } } } void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> board, const google::protobuf::FieldDescriptor *desc, - const TString &prefix) + const TString &prefix, + bool allowExisting) { auto &opts = desc->options().GetExtension(NKikimrConfig::ControlOptions); auto name = MakePrefix(prefix, desc->name()); @@ -139,11 +146,22 @@ void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> boa ui64 maxValue = opts.GetMaxValue(); Controls[name] = TControlWrapper(defaultValue, minValue, maxValue); - // All controls are actually shared but use RegisterLocalControl to - // make sure it is not registered by someone else with other options. - auto res = board->RegisterLocalControl(Controls[name], name); - Y_VERIFY_S(res, "Immediate Control " << name << " was registered before " - << "TImmediateControlsConfigurator creation"); + + // When we register control it is possible that is has already been + // registered by some other code, in which case it may be used before it + // is properly configured. It can currently only happen in configurator + // tests, where it is created very late after some tablets have already + // started. + auto res = board->RegisterSharedControl(Controls[name], name); + Y_VERIFY_S(res || allowExisting, + "Immediate Control " << name << " was registered before " + << "TImmediateControlsConfigurator creation"); + if (Y_UNLIKELY(!res)) { + Cerr << "WARNING: immediate control " << name << " was registered before " + << "TImmediateControlsConfigurator creation. " + << "A default value may have been used before it was configured." << Endl; + Controls[name].Reset(defaultValue, minValue, maxValue); + } } void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg, @@ -192,9 +210,10 @@ TString TImmediateControlsConfigurator::MakePrefix(const TString &prefix, } IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg) + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls) { - return new TImmediateControlsConfigurator(board, cfg); + return new TImmediateControlsConfigurator(board, cfg, allowExistingControls); } } // namespace NConsole diff --git a/ydb/core/cms/console/immediate_controls_configurator.h b/ydb/core/cms/console/immediate_controls_configurator.h index c6c4ba029d..53f1b3e5d7 100644 --- a/ydb/core/cms/console/immediate_controls_configurator.h +++ b/ydb/core/cms/console/immediate_controls_configurator.h @@ -13,7 +13,8 @@ namespace NConsole { * immediate control board via CMS. */ IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board, - const NKikimrConfig::TImmediateControlsConfig &cfg); + const NKikimrConfig::TImmediateControlsConfig &cfg, + bool allowExistingControls = false); } // namespace NConsole } // namespace NKikimr diff --git a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp index 8ad8f6ed1b..2d2a9f8251 100644 --- a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp +++ b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp @@ -57,7 +57,8 @@ NKikimrConsole::TConfigItem ITEM_CONTROLS_EXCEED_MAX; void InitImmediateControlsConfigurator(TTenantTestRuntime &runtime) { runtime.Register(CreateImmediateControlsConfigurator(runtime.GetAppData().Icb, - NKikimrConfig::TImmediateControlsConfig())); + NKikimrConfig::TImmediateControlsConfig(), + /* allowExistingControls */ true)); TDispatchOptions options; options.FinalEvents.emplace_back(TEvConfigsDispatcher::EvSetConfigSubscriptionResponse, 1); runtime.DispatchEvents(options); diff --git a/ydb/core/cms/json_proxy_proto.h b/ydb/core/cms/json_proxy_proto.h index b1b87f30cd..9c66617066 100644 --- a/ydb/core/cms/json_proxy_proto.h +++ b/ydb/core/cms/json_proxy_proto.h @@ -73,6 +73,8 @@ protected: return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TDataShardControls::TExecutionProfileOptions::descriptor(), ctx); else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTxLimitControls") return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTxLimitControls::descriptor(), ctx); + else if (name == ".NKikimrConfig.TImmediateControlsConfig.TCoordinatorControls") + return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TCoordinatorControls::descriptor(), ctx); } ctx.Send(RequestEvent->Sender, diff --git a/ydb/core/control/immediate_control_board_control.cpp b/ydb/core/control/immediate_control_board_control.cpp index 7762cf9d88..6acb68e985 100644 --- a/ydb/core/control/immediate_control_board_control.cpp +++ b/ydb/core/control/immediate_control_board_control.cpp @@ -15,6 +15,13 @@ void TControl::Set(TAtomicBase newValue) { AtomicSet(Default, newValue); } +void TControl::Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) { + Value = defaultValue; + Default = defaultValue; + LowerBound = lowerBound; + UpperBound = upperBound; +} + TAtomicBase TControl::SetFromHtmlRequest(TAtomicBase newValue) { TAtomicBase prevValue = AtomicGet(Value); if (newValue == AtomicGet(Default)) { diff --git a/ydb/core/control/immediate_control_board_control.h b/ydb/core/control/immediate_control_board_control.h index 7fd038b03a..381b9cb0fc 100644 --- a/ydb/core/control/immediate_control_board_control.h +++ b/ydb/core/control/immediate_control_board_control.h @@ -16,6 +16,7 @@ public: TControl(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound); void Set(TAtomicBase newValue); + void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound); TAtomicBase SetFromHtmlRequest(TAtomicBase newValue); diff --git a/ydb/core/control/immediate_control_board_impl.cpp b/ydb/core/control/immediate_control_board_impl.cpp index fa26926fae..06f6dab1a8 100644 --- a/ydb/core/control/immediate_control_board_impl.cpp +++ b/ydb/core/control/immediate_control_board_impl.cpp @@ -15,8 +15,14 @@ bool TControlBoard::RegisterLocalControl(TControlWrapper control, TString name) return result; } -void TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) { - control.Control = Board.InsertIfAbsent(name, control.Control); +bool TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) { + auto& ptr = Board.InsertIfAbsent(name, control.Control); + if (control.Control == ptr) { + return true; + } else { + control.Control = ptr; + return false; + } } void TControlBoard::RestoreDefaults() { diff --git a/ydb/core/control/immediate_control_board_impl.h b/ydb/core/control/immediate_control_board_impl.h index a01e09f4d6..f5463aa12d 100644 --- a/ydb/core/control/immediate_control_board_impl.h +++ b/ydb/core/control/immediate_control_board_impl.h @@ -14,7 +14,7 @@ private: public: bool RegisterLocalControl(TControlWrapper control, TString name); - void RegisterSharedControl(TControlWrapper& control, TString name); + bool RegisterSharedControl(TControlWrapper& control, TString name); void RestoreDefaults(); diff --git a/ydb/core/control/immediate_control_board_wrapper.h b/ydb/core/control/immediate_control_board_wrapper.h index ce8a6adde5..e2da0d80d0 100644 --- a/ydb/core/control/immediate_control_board_wrapper.h +++ b/ydb/core/control/immediate_control_board_wrapper.h @@ -29,6 +29,15 @@ public: bool IsTheSame(TControlWrapper another) { return Control == another.Control; } + + /** + * Resets an existing control to different values. + * + * WARNING: this method is not thread safe and may only be used during initialization. + */ + void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) { + Control->Reset(defaultValue, lowerBound, upperBound); + } }; class TMemorizableControlWrapper { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ba911f7f4f..9cce061fe1 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1178,6 +1178,28 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 134217728, DefaultValue: 0 }]; + + optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = { + Description: "Enables prioritized mvcc snapshot reads over immediate writes", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = { + Description: "Enables unprotected (fully readonly) mvcc snapshot reads", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + + optional uint64 EnableLeaderLeases = 10 [(ControlOptions) = { + Description: "Enables leader leases for processing read-only queries", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 MinLeaderLeaseDurationUs = 11 [(ControlOptions) = { + Description: "The minimum leader lease duration in microseconds", + MinValue: 1000, + MaxValue: 5000000, + DefaultValue: 250000 }]; } message TTxLimitControls { @@ -1203,8 +1225,22 @@ message TImmediateControlsConfig { DefaultValue: 60000 }]; } + message TCoordinatorControls { + optional uint64 EnableLeaderLeases = 1 [(ControlOptions) = { + Description: "Enables leader leases for processing read-only queries", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 MinLeaderLeaseDurationUs = 2 [(ControlOptions) = { + Description: "The minimum leader lease duration in microseconds", + MinValue: 1000, + MaxValue: 5000000, + DefaultValue: 250000 }]; + } + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; + optional TCoordinatorControls CoordinatorControls = 3; }; message TMeteringConfig { diff --git a/ydb/core/protos/counters_coordinator.proto b/ydb/core/protos/counters_coordinator.proto index 28433e3e1a..f5437245bb 100644 --- a/ydb/core/protos/counters_coordinator.proto +++ b/ydb/core/protos/counters_coordinator.proto @@ -12,7 +12,7 @@ enum ESimpleCounters { } enum ECumulativeCounters { - COUNTER_CUMULATIVE_IGNORE = 0; + COUNTER_REQ_ACQUIRE_READ_STEP = 0 [(CounterOpts) = {Name: "AcquireReadStepRequests"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto index f0bec238e7..9e51d432da 100644 --- a/ydb/core/protos/tablet.proto +++ b/ydb/core/protos/tablet.proto @@ -60,6 +60,11 @@ message TTabletTypes { } } +message TTabletLogMetadata { + optional uint32 Key = 1; + optional bytes Data = 2; +} + message TTabletLogEntry { // normal log entries repeated uint32 DependsOn = 1; @@ -79,6 +84,7 @@ message TTabletLogEntry { repeated fixed64 ZeroTailBitmask = 12; optional bytes EmbeddedLogBody = 13; + repeated TTabletLogMetadata EmbeddedMetadata = 14; } message TTabletChannelInfo { @@ -238,3 +244,11 @@ message TEvTabletStop { optional uint64 TabletID = 1; optional EReason Reason = 2; } + +message TEvDropLease { + optional uint64 TabletID = 1; +} + +message TEvLeaseDropped { + optional uint64 TabletID = 1; +} diff --git a/ydb/core/tablet/tablet_req_rebuildhistory.cpp b/ydb/core/tablet/tablet_req_rebuildhistory.cpp index c0b4c0c2bc..620b25c1a8 100644 --- a/ydb/core/tablet/tablet_req_rebuildhistory.cpp +++ b/ydb/core/tablet/tablet_req_rebuildhistory.cpp @@ -44,6 +44,7 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil bool IsSnapshot; bool IsTotalSnapshot; TString EmbeddedLogBody; + TVector<TEvTablet::TCommitMetadata> EmbeddedMetadata; TVector<TLogoBlobID> GcDiscovered; TVector<TLogoBlobID> GcLeft; @@ -125,6 +126,14 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil GcLeft[i] = LogoBlobIDFromLogoBlobID(x.GetGcLeft(i)); } + if (const size_t metaSize = x.EmbeddedMetadataSize()) { + EmbeddedMetadata.reserve(metaSize); + for (size_t i = 0; i < metaSize; ++i) { + const auto& meta = x.GetEmbeddedMetadata(i); + EmbeddedMetadata.emplace_back(meta.GetKey(), meta.GetData()); + } + } + switch (Status) { case StatusUnknown: Status = StatusBody; @@ -679,9 +688,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil }()); if (entry.EmbeddedLogBody) - graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); else - graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); if (lastUnbrokenTailEntry + 1 == id.second) lastUnbrokenTailEntry = id.second; @@ -717,9 +726,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil }()); if (entry.EmbeddedLogBody) - graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); else - graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft); + graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata)); hasSnapshotInGeneration |= entry.IsSnapshot; break; diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp index 3f3081f128..4537a4af91 100644 --- a/ydb/core/tablet/tablet_sys.cpp +++ b/ydb/core/tablet/tablet_sys.cpp @@ -1164,6 +1164,16 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) { entry->FollowerUpdate->Body = msg->EmbeddedLogBody; } + if (!msg->EmbeddedMetadata.empty()) { + auto *m = x->MutableEmbeddedMetadata(); + m->Reserve(msg->EmbeddedMetadata.size()); + for (const auto &meta : msg->EmbeddedMetadata) { + auto *p = m->Add(); + p->SetKey(meta.Key); + p->SetData(meta.Data); + } + } + if (saveFollowerUpdate && msg->FollowerAux) entry->FollowerUpdate->AuxPayload = msg->FollowerAux; diff --git a/ydb/core/tablet_flat/flat_boot_cookie.h b/ydb/core/tablet_flat/flat_boot_cookie.h index 58e810325a..4ff737c2bd 100644 --- a/ydb/core/tablet_flat/flat_boot_cookie.h +++ b/ydb/core/tablet_flat/flat_boot_cookie.h @@ -68,6 +68,10 @@ namespace NBoot { const ui32 Raw = 0; /* only lower 24 bits are used */ }; + enum class ELogCommitMeta : ui32 { + LeaseInfo = 1, + }; + } } } diff --git a/ydb/core/tablet_flat/flat_boot_lease.cpp b/ydb/core/tablet_flat/flat_boot_lease.cpp new file mode 100644 index 0000000000..0fe6966b49 --- /dev/null +++ b/ydb/core/tablet_flat/flat_boot_lease.cpp @@ -0,0 +1,110 @@ +#include "flat_executor_bootlogic.h" +#include <ydb/core/util/pb.h> +#include <library/cpp/actors/interconnect/interconnect.h> + +namespace NKikimr { +namespace NTabletFlatExecutor { + +class TLeaseWaiter : public TActorBootstrapped<TLeaseWaiter> { +public: + TLeaseWaiter( + const TActorId& owner, + TMonotonic bootTimestamp, + const TActorId& leaseHolder, + TDuration leaseDuration) + : Owner(owner) + , BootTimestamp(bootTimestamp) + , LeaseHolder(leaseHolder) + , LeaseDuration(leaseDuration) + { } + + void Bootstrap() { + SendRequest(); + Schedule(BootTimestamp + LeaseDuration * 2, new TEvents::TEvWakeup); + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + sFunc(TEvents::TEvWakeup, HandleTimeout); + hFunc(TEvTablet::TEvLeaseDropped, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } + + void SendRequest() { + Send(LeaseHolder, new TEvTablet::TEvDropLease(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); + } + + void Finish() { + Send(Owner, new TEvTablet::TEvLeaseDropped()); + PassAway(); + } + + void HandleTimeout() { + Finish(); + } + + void Handle(TEvTablet::TEvLeaseDropped::TPtr&) { + Finish(); + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr&) { + // We don't need to do anything + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr&) { + // Keep retrying until timeout is reached + SendRequest(); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + auto* msg = ev->Get(); + Y_VERIFY(ev->Sender == LeaseHolder); + Y_VERIFY(msg->SourceType == TEvTablet::TEvDropLease::EventType); + if (msg->Reason == TEvents::TEvUndelivered::ReasonActorUnknown) { + // We have proved lease holder no longer exists + return Finish(); + } + + // We may get undelivered notifications due to disconnections + // Since we cannot trust them we expect to retry on TEvNodeDisconnected + } + +private: + const TActorId Owner; + const TMonotonic BootTimestamp; + const TActorId LeaseHolder; + const TDuration LeaseDuration; +}; + +void TExecutorBootLogic::StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept +{ + TActorId leaseHolder; + TDuration leaseDuration; + + for (const auto& entry : graph.Entries) { + for (const auto& meta : entry.EmbeddedMetadata) { + if (NBoot::ELogCommitMeta(meta.Key) == NBoot::ELogCommitMeta::LeaseInfo) { + TProtoBox<NKikimrExecutorFlat::TLeaseInfoMetadata> proto(meta.Data); + if (proto.HasLeaseHolder()) { + leaseHolder = ActorIdFromProto(proto.GetLeaseHolder()); + } + if (proto.HasLeaseDurationUs()) { + leaseDuration = TDuration::MicroSeconds(proto.GetLeaseDurationUs()); + } + } + } + } + + if (leaseHolder && leaseDuration) { + LeaseWaiter = Ops->RegisterWithSameMailbox(new TLeaseWaiter(SelfId, bootTimestamp, leaseHolder, leaseDuration)); + } +} + +} // namespace NTabletFlatExecutor +} // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_exec_commit.h b/ydb/core/tablet_flat/flat_exec_commit.h index 7212449c25..a1402dcf24 100644 --- a/ydb/core/tablet_flat/flat_exec_commit.h +++ b/ydb/core/tablet_flat/flat_exec_commit.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/base/logoblob.h> +#include <ydb/core/base/tablet.h> #include <util/generic/vector.h> #include <util/generic/string.h> @@ -53,6 +54,7 @@ namespace NTabletFlatExecutor { TString FollowerAux; TVector<TLogoBlob> Refs; TGCBlobDelta GcDelta; + TVector<TEvTablet::TCommitMetadata> Metadata; TSeat *FirstTx = nullptr; TSeat *LastTx = nullptr; }; diff --git a/ydb/core/tablet_flat/flat_exec_commit_mgr.h b/ydb/core/tablet_flat/flat_exec_commit_mgr.h index 20f5d5ffe1..caeabb014f 100644 --- a/ydb/core/tablet_flat/flat_exec_commit_mgr.h +++ b/ydb/core/tablet_flat/flat_exec_commit_mgr.h @@ -175,6 +175,7 @@ namespace NTabletFlatExecutor { ev->FollowerAux = std::move(commit.FollowerAux); ev->GcDiscovered = std::move(commit.GcDelta.Created); ev->GcLeft = std::move(commit.GcDelta.Deleted); + ev->EmbeddedMetadata = std::move(commit.Metadata); Ops->Send(Owner, ev, 0, ui64(commit.Type)); } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 164775ea47..91f3a137bc 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -408,6 +408,15 @@ void TExecutor::Active(const TActorContext &ctx) { CompactionLogic->UpdateInMemStatsStep(it.first, 0, Database->GetTableMemSize(it.first)); UpdateCompactions(); + + LeaseEnabled = Owner->ReadOnlyLeaseEnabled(); + if (LeaseEnabled) { + LeaseDuration = Owner->ReadOnlyLeaseDuration(); + if (!LeaseDuration) { + LeaseEnabled = false; + } + } + MakeLogSnapshot(); if (auto error = CheckBorrowConsistency()) { @@ -432,8 +441,6 @@ void TExecutor::TranscriptBootOpResult(ui32 res, const TActorContext &ctx) { case TExecutorBootLogic::OpResultComplete: return Active(ctx); case TExecutorBootLogic::OpResultBroken: - BootLogic.Destroy(); - if (auto logl = Logger->Log(ELnLev::Error)) { logl << NFmt::Do(*this) << " Broken while booting"; } @@ -453,10 +460,10 @@ void TExecutor::TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ct case TExecutorBootLogic::OpResultComplete: return ActivateFollower(ctx); case TExecutorBootLogic::OpResultBroken: - BootLogic.Destroy(); if (auto logl = Logger->Log(ELnLev::Error)) { logl << NFmt::Do(*this) << " Broken while follower booting"; } + return Broken(); default: Y_FAIL("unknown boot result"); @@ -588,6 +595,7 @@ TExecutorCaches TExecutor::CleanupState() { TExecutorCaches caches; if (BootLogic) { + BootLogic->Cancel(); caches = BootLogic->DetachCaches(); } else { if (PrivatePageCache) { @@ -1395,6 +1403,85 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } } +TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + Y_VERIFY(at >= LeaseEnd); + + if (!LeaseEnabled) { + // Automatically enable leases + LeaseEnabled = true; + LeaseDuration = Owner->ReadOnlyLeaseDuration(); + Y_VERIFY(LeaseDuration); + } + + // Try to find a suitable commit that is already in flight + TLeaseCommit* lease = nullptr; + for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) { + if (at < it->LeaseEnd) { + lease = &*it; + } else { + break; + } + } + + if (!lease) { + if (LeaseDropped) { + // We cannot start new lease confirmations + return nullptr; + } + + LogicRedo->FlushBatchedLog(); + + auto commit = CommitManager->Begin(true, ECommit::Misc); + + NKikimrExecutorFlat::TLeaseInfoMetadata proto; + ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); + proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); + + TString data; + bool ok = proto.SerializeToString(&data); + Y_VERIFY(ok); + + commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + + CommitManager->Commit(commit); + } + + return lease; +} + +void TExecutor::ConfirmReadOnlyLease(TMonotonic at) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + LeaseUsed = true; + + if (LeaseEnabled && at < LeaseEnd) { + return; + } + + EnsureReadOnlyLease(at); +} + +void TExecutor::ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) { + Y_VERIFY(Stats->IsActive && !Stats->IsFollower); + LeaseUsed = true; + + if (LeaseEnabled && at < LeaseEnd) { + callback(); + return; + } + + if (auto* lease = EnsureReadOnlyLease(at)) { + lease->Callbacks.push_back(std::move(callback)); + } +} + +void TExecutor::ConfirmReadOnlyLease(std::function<void()> callback) { + ConfirmReadOnlyLease(AppData()->MonotonicTimeProvider->Now(), std::move(callback)); +} + bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction; } @@ -2389,6 +2476,21 @@ void TExecutor::MakeLogSnapshot() { GcLogic->SnapToLog(snap, commit->Step); LogicSnap->MakeSnap(snap, *commit, Logger.Get()); + if (LeaseEnabled) { + NKikimrExecutorFlat::TLeaseInfoMetadata proto; + ActorIdToProto(SelfId(), proto.MutableLeaseHolder()); + proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds()); + + TString data; + bool ok = proto.SerializeToString(&data); + Y_VERIFY(ok); + + commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data)); + + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration); + } + CommitManager->Commit(commit); CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage()); @@ -2605,6 +2707,41 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) { } } +void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx) { + TMonotonic ts = AppData(ctx)->MonotonicTimeProvider->Now(); + + LeaseDropped = true; + LeaseEnd = Min(LeaseEnd, ts); + + for (auto& l : LeaseCommits) { + l.LeaseEnd = Min(l.LeaseEnd, ts); + } + + ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped); + Owner->ReadOnlyLeaseDropped(); +} + +void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &) { + Y_VERIFY(LeaseExtendPending); + LeaseExtendPending = false; + + if (!LeaseCommits.empty() || !LeaseEnabled || LeaseDropped) { + return; + } + + if (LeaseUsed) { + LeaseUsed = false; + UnusedLeaseExtensions = 0; + } else if (UnusedLeaseExtensions >= 5) { + return; + } else { + ++UnusedLeaseExtensions; + } + + // Start a new lease extension commit + EnsureReadOnlyLease(LeaseEnd); +} + void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx) { TEvTablet::TEvCommitResult *msg = ev->Get(); @@ -2631,6 +2768,41 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext << " for step " << step; } + if (!LeaseCommits.empty()) { + auto& l = LeaseCommits.front(); + Y_VERIFY(step <= l.Step); + if (step == l.Step) { + LeasePersisted = true; + LeaseEnd = Max(LeaseEnd, l.LeaseEnd); + + while (!l.Callbacks.empty()) { + // Note: callback may (though unlikely) recursively add more callbacks + TVector<std::function<void()>> callbacks; + callbacks.swap(l.Callbacks); + for (auto& callback : callbacks) { + callback(); + } + } + + LeaseCommits.pop_front(); + + // Calculate a full round-trip latency for leases + // When this latency is larger than third of lease duration we want + // to increase lease duration so we would have enough time for + // processing read-only requests without additional commits + TMonotonic ts = AppData()->MonotonicTimeProvider->Now(); + if ((LeaseEnd - ts) < LeaseDuration / 3) { + LeaseDuration *= 2; + } + + // We want to schedule a new commit before the lease expires + if (LeaseCommits.empty() && !LeaseExtendPending) { + Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend); + LeaseExtendPending = true; + } + } + } + switch (cookie) { case ECommit::Redo: { @@ -3562,11 +3734,13 @@ STFUNC(TExecutor::StateWork) { CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters); cFunc(TEvPrivate::EvCheckYellow, UpdateYellow); cFunc(TEvPrivate::EvUpdateCompactions, UpdateCompactions); + HFunc(TEvPrivate::TEvLeaseExtend, Handle); HFunc(TEvents::TEvWakeup, Wakeup); hFunc(TEvents::TEvFlushLog, Handle); hFunc(NSharedCache::TEvRequest, Handle); hFunc(NSharedCache::TEvResult, Handle); hFunc(NSharedCache::TEvUpdated, Handle); + HFunc(TEvTablet::TEvDropLease, Handle); HFunc(TEvTablet::TEvCommitResult, Handle); hFunc(TEvTablet::TEvCheckBlobstorageStatusResult, Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 4c6f091560..9e72b436cf 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -334,6 +334,7 @@ class TExecutor EvActivateCompactionRead, EvActivateCompactionChanges, EvBrokenTransaction, + EvLeaseExtend, EvEnd }; @@ -347,6 +348,7 @@ class TExecutor struct TEvActivateCompactionRead : public TEventLocal<TEvActivateCompactionRead, EvActivateCompactionRead> {}; struct TEvActivateCompactionChanges : public TEventLocal<TEvActivateCompactionChanges, EvActivateCompactionChanges> {}; struct TEvBrokenTransaction : public TEventLocal<TEvBrokenTransaction, EvBrokenTransaction> {}; + struct TEvLeaseExtend : public TEventLocal<TEvLeaseExtend, EvLeaseExtend> {}; }; const TIntrusivePtr<ITimeProvider> Time = nullptr; @@ -356,6 +358,37 @@ class TExecutor ui32 FollowerId = 0; + // This becomes true when executor enables the use of leases, e.g. starts persisting them + // This may become false again when leases are not actively used for some time + bool LeaseEnabled = false; + // As soon as lease is persisted we may theoretically use read-only checks for lease prolongation + bool LeasePersisted = false; + // When lease is dropped we must stop accepting new lease-dependent requests + bool LeaseDropped = false; + // When lease is used in any given cycle this becomes true + bool LeaseUsed = false; + // This flag marks when TEvLeaseExtend message is already pending + bool LeaseExtendPending = false; + TDuration LeaseDuration; + TMonotonic LeaseEnd; + // Counts the number of times an unused lease has been extended + size_t UnusedLeaseExtensions = 0; + + struct TLeaseCommit { + const ui32 Step; + const TMonotonic Start; + TMonotonic LeaseEnd; + TVector<std::function<void()>> Callbacks; + + TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd) + : Step(step) + , Start(start) + , LeaseEnd(leaseEnd) + { } + }; + + TList<TLeaseCommit> LeaseCommits; + using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>; THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue; THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> PendingQueue; @@ -502,6 +535,8 @@ class TExecutor void ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch); void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx); + void Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvLeaseExtend::TPtr &ev, const TActorContext &ctx); void Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx); @@ -575,6 +610,11 @@ public: void DetachTablet(const TActorContext &ctx) override; void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override; + TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); + void ConfirmReadOnlyLease(TMonotonic at) override; + void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override; + void ConfirmReadOnlyLease(std::function<void()> callback) override; + TString BorrowSnapshot(ui32 tableId, const TTableSnapshotContext& snap, TRawVals from, TRawVals to, ui64 loaner) const override; ui64 MakeScanSnapshot(ui32 table) override; diff --git a/ydb/core/tablet_flat/flat_executor.proto b/ydb/core/tablet_flat/flat_executor.proto index 7758812187..0b865d16df 100644 --- a/ydb/core/tablet_flat/flat_executor.proto +++ b/ydb/core/tablet_flat/flat_executor.proto @@ -1,6 +1,7 @@ import "ydb/core/protos/base.proto"; import "ydb/core/protos/tablet.proto"; import "ydb/core/protos/flat_scheme_op.proto"; +import "library/cpp/actors/protos/actors.proto"; package NKikimrExecutorFlat; option java_package = "ru.yandex.kikimr.proto"; @@ -247,3 +248,13 @@ message TDatabaseBorrowPart { repeated TPartInfo Parts = 3; repeated TTxStatus TxStatusParts = 4; } + +message TLeaseInfoMetadata { + // Actor id of the lease holder, which is an executor actor + optional NActorsProto.TActorId LeaseHolder = 1; + + // Lease duration in microseconds. Lease holder may perform leader-only + // tasks up to this duration since the last channel 0 block confirmation, + // and expects future generations will not interfere. + optional uint32 LeaseDurationUs = 2; +} diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp index cf72960f9c..4292a61d3c 100644 --- a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp @@ -103,9 +103,14 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot( TEvTablet::TEvBoot::TPtr &ev, TExecutorCaches &&caches) { - PrepareEnv(false, ev->Get()->Generation, std::move(caches)); + TEvTablet::TEvBoot *msg = ev->Get(); + PrepareEnv(false, msg->Generation, std::move(caches)); - Steps->Spawn<NBoot::TStages>(std::move(ev->Get()->DependencyGraph), nullptr); + if (msg->DependencyGraph) { + StartLeaseWaiter(BootTimestamp, *msg->DependencyGraph); + } + + Steps->Spawn<NBoot::TStages>(std::move(msg->DependencyGraph), nullptr); Steps->Execute(); return CheckCompletion(); @@ -113,7 +118,7 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot( void TExecutorBootLogic::PrepareEnv(bool follower, ui32 gen, TExecutorCaches caches) noexcept { - BootStartTime = TAppData::TimeProvider->Now(); + BootTimestamp = AppData()->MonotonicTimeProvider->Now(); auto *sys = TlsActivationContext->ExecutorThread.ActorSystem; auto *logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_FLATBOOT); @@ -212,9 +217,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::CheckCompletion() if (Loads) return OpResultContinue; + if (LeaseWaiter) + return OpResultContinue; + if (State().Follower || Restored) { if (auto logl = Steps->Logger()->Log(ELnLev::Info)) { - auto spent = TAppData::TimeProvider->Now() - BootStartTime; + auto spent = AppData()->MonotonicTimeProvider->Now() - BootTimestamp; logl << NFmt::Do(State()) << " booting completed" @@ -278,6 +286,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl step.Drop(); Steps->Execute(); + } else if (auto *msg = ev.CastAsLocal<TEvTablet::TEvLeaseDropped>()) { + if (LeaseWaiter != ev.Sender) { + return OpResultUnhandled; + } + + LeaseWaiter = { }; } else { return OpResultUnhandled; } @@ -291,6 +305,9 @@ TAutoPtr<NBoot::TResult> TExecutorBootLogic::ExtractState() noexcept { } void TExecutorBootLogic::Cancel() { + if (LeaseWaiter) { + Ops->Send(LeaseWaiter, new TEvents::TEvPoison); + } } void TExecutorBootLogic::FollowersSyncComplete() { diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.h b/ydb/core/tablet_flat/flat_executor_bootlogic.h index 34edb2d6b4..56084c36b0 100644 --- a/ydb/core/tablet_flat/flat_executor_bootlogic.h +++ b/ydb/core/tablet_flat/flat_executor_bootlogic.h @@ -74,8 +74,9 @@ private: TAutoPtr<NBoot::TBack> State_; TAutoPtr<NBoot::TResult> Result_; TAutoPtr<NBoot::TRoot> Steps; + TActorId LeaseWaiter; - TInstant BootStartTime; + TMonotonic BootTimestamp; const TIntrusiveConstPtr<TTabletStorageInfo> Info; @@ -91,6 +92,7 @@ private: EOpResult CheckCompletion(); void PrepareEnv(bool follower, ui32 generation, TExecutorCaches caches) noexcept; + void StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept; ui32 GetBSGroupFor(const TLogoBlobID &logo) const; ui32 GetBSGroupID(ui32 channel, ui32 generation); void LoadEntry(TIntrusivePtr<NBoot::TLoadBlobs>); diff --git a/ydb/core/tablet_flat/flat_executor_leases_ut.cpp b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp new file mode 100644 index 0000000000..3761f4fcd0 --- /dev/null +++ b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp @@ -0,0 +1,365 @@ +#include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/base/statestorage.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NTabletFlatExecutor { + +Y_UNIT_TEST_SUITE(TFlatExecutorLeases) { + + struct TEvLeasesTablet { + enum EEv { + EvWrite = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvWriteAck, + EvRead, + EvReadResult, + }; + + struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> { + TEvWrite(ui64 key, ui64 value) + : Key(key) + , Value(value) + { } + + const ui64 Key; + const ui64 Value; + }; + + struct TEvWriteAck : public TEventLocal<TEvWriteAck, EvWriteAck> { + TEvWriteAck(ui64 key) + : Key(key) + { } + + const ui64 Key; + }; + + struct TEvRead : public TEventLocal<TEvRead, EvRead> { + TEvRead(ui64 key) + : Key(key) + { } + + const ui64 Key; + }; + + struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> { + TEvReadResult(ui64 key, ui64 value) + : Key(key) + , Value(value) + { } + + const ui64 Key; + const ui64 Value; + }; + }; + + class TLeasesTablet + : public TActor<TLeasesTablet> + , public TTabletExecutedFlat + { + public: + TLeasesTablet(const TActorId& tablet, TTabletStorageInfo* info, bool enableInitialLease) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, nullptr) + , EnableInitialLease(enableInitialLease) + { } + + private: + struct Schema : NIceDb::Schema { + struct Data : Table<1> { + struct Key : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Value : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<Key>; + using TColumns = TableColumns<Key, Value>; + }; + + using TTables = SchemaTables<Data>; + }; + + using TTxBase = TTransactionBase<TLeasesTablet>; + + struct TTxInitSchema : public TTxBase { + TTxInitSchema(TSelf* self) + : TTxBase(self) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + NIceDb::TNiceDb db(txc.DB); + db.Materialize<Schema>(); + Self->RunTxInit(); + return true; + } + + void Complete(const TActorContext&) { + // nothing + } + }; + + void RunTxInitSchema() { + Execute(new TTxInitSchema(this)); + } + + struct TTxInit : public TTxBase { + TTxInit(TSelf* self) + : TTxBase(self) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + NIceDb::TNiceDb db(txc.DB); + + auto rowset = db.Table<Schema::Data>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + ui64 key = rowset.GetValue<Schema::Data::Key>(); + ui64 value = rowset.GetValue<Schema::Data::Value>(); + Self->Data[key] = value; + if (!rowset.Next()) { + return false; + } + } + + return true; + } + + void Complete(const TActorContext& ctx) { + Self->SwitchToWork(ctx); + } + }; + + void RunTxInit() { + Execute(new TTxInit(this)); + } + + struct TTxWrite : public TTxBase { + TTxWrite(TSelf* self, TEvLeasesTablet::TEvWrite::TPtr&& ev) + : TTxBase(self) + , Ev(std::move(ev)) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) { + auto* msg = Ev->Get(); + + NIceDb::TNiceDb db(txc.DB); + + db.Table<Schema::Data>().Key(msg->Key).Update( + NIceDb::TUpdate<Schema::Data::Value>(msg->Value)); + return true; + } + + void Complete(const TActorContext& ctx) { + auto* msg = Ev->Get(); + Self->Data[msg->Key] = msg->Value; + + ctx.Send(Ev->Sender, new TEvLeasesTablet::TEvWriteAck(msg->Key), 0, Ev->Cookie); + } + + const TEvLeasesTablet::TEvWrite::TPtr Ev; + }; + + void Handle(TEvLeasesTablet::TEvWrite::TPtr& ev) { + Execute(new TTxWrite(this, std::move(ev))); + } + + void Handle(TEvLeasesTablet::TEvRead::TPtr& ev) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto key = ev->Get()->Key; + auto value = Data[key]; + Executor()->ConfirmReadOnlyLease([this, sender, cookie, key, value]() { + Send(sender, new TEvLeasesTablet::TEvReadResult(key, value), 0, cookie); + }); + } + + private: + void OnDetach(const TActorContext& ctx) override { + Die(ctx); + } + + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext& ctx) override { + Die(ctx); + } + + void OnActivateExecutor(const TActorContext&) override { + Become(&TThis::StateWork); + RunTxInitSchema(); + } + + void DefaultSignalTabletActive(const TActorContext&) override { + // nothing + } + + void SwitchToWork(const TActorContext& ctx) { + SignalTabletActive(ctx); + } + + bool ReadOnlyLeaseEnabled() override { + return EnableInitialLease; + } + + private: + STFUNC(StateInit) { + StateInitImpl(ev, ctx); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvLeasesTablet::TEvWrite, Handle); + hFunc(TEvLeasesTablet::TEvRead, Handle); + default: + HandleDefaultEvents(ev, ctx); + } + } + + private: + const bool EnableInitialLease; + THashMap<ui64, ui64> Data; + }; + + void DoBasics(bool deliverDropLease, bool enableInitialLease) { + TTestBasicRuntime runtime(2); + SetupTabletServices(runtime); + runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + + const ui64 tabletId = TTestTxConfig::TxTablet0; + + auto boot1 = CreateTestBootstrapper(runtime, + CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY), + [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) { + return new TLeasesTablet(tablet, info, enableInitialLease); + }); + runtime.EnableScheduleForActor(boot1); + + { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1)); + runtime.DispatchEvents(options); + } + + auto sender = runtime.AllocateEdgeActor(0); + auto pipe1 = runtime.ConnectToPipe(tabletId, sender, 0, NTabletPipe::TClientRetryPolicy::WithRetries()); + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvWrite(1, 11)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvWriteAck>(sender); + } + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + bool blockDropLease = true; + TVector<THolder<IEventHandle>> dropLeaseMsgs; + auto observerFunc = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTablet::TEvDropLease::EventType: + if (blockDropLease) { + dropLeaseMsgs.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + case TEvTablet::TEvTabletDead::EventType: + // Prevent tablets from restarting + // This is most important for the boot1 actor, since it + // quickly receives bad commit signal and tries to restart + // the original tablet. However we prevent executor from + // killing itself too, so we could make additional queries. + return TTestActorRuntime::EEventAction::DROP; + case TEvTablet::TEvDemoted::EventType: + // Block guardian from telling tablet about a new generation + return TTestActorRuntime::EEventAction::DROP; + case TEvStateStorage::TEvReplicaLeaderDemoted::EventType: + // Block replica from telling tablet about a new generation + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(observerFunc); + + auto boot2 = CreateTestBootstrapper(runtime, + CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY), + [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) { + return new TLeasesTablet(tablet, info, enableInitialLease); + }, + /* node index */ 1); + runtime.EnableScheduleForActor(boot2); + + { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1)); + runtime.DispatchEvents(options); + } + + // After the new tablet is restored, we should still be able to read from the old tablet for a while + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return dropLeaseMsgs.size() == 1; }, "drop lease message"); + + // We expect the new tablet to send a drop lease message, but the old tablet must still be readable, because it didn't receive it yet + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + if (deliverDropLease) { + blockDropLease = false; + for (auto& ev : dropLeaseMsgs) { + runtime.Send(ev.Release(), 0, true); + } + dropLeaseMsgs.clear(); + } + + auto sender2 = runtime.AllocateEdgeActor(1); + auto pipe2 = runtime.ConnectToPipe(tabletId, sender2, 1, NTabletPipe::TClientRetryPolicy::WithRetries()); + runtime.SendToPipe(pipe2, sender2, new TEvLeasesTablet::TEvRead(1), 1); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender2); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u); + } + + runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1)); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender, TDuration::Seconds(1)); + UNIT_ASSERT(!ev); + } + } + + Y_UNIT_TEST(Basics) { + DoBasics(true, false); + } + + Y_UNIT_TEST(BasicsLeaseTimeout) { + DoBasics(false, false); + } + + Y_UNIT_TEST(BasicsInitialLease) { + DoBasics(true, true); + } + + Y_UNIT_TEST(BasicsInitialLeaseTimeout) { + DoBasics(false, true); + } +} + +} // namespace NKikimr::NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp index f69991ece9..258addbf7f 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp @@ -44,6 +44,18 @@ namespace NFlatExecutorSetup { if (launcherID) LauncherActorID = launcherID; } + + bool ITablet::ReadOnlyLeaseEnabled() { + return false; + } + + TDuration ITablet::ReadOnlyLeaseDuration() { + return TDuration::MilliSeconds(250); + } + + void ITablet::ReadOnlyLeaseDropped() { + // nothing by default + } } }} diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 3d550f30ba..f628f6cd42 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -452,6 +452,10 @@ namespace NFlatExecutorSetup { virtual void OnLeaderUserAuxUpdate(TString) { /* default */ } + virtual bool ReadOnlyLeaseEnabled(); + virtual TDuration ReadOnlyLeaseDuration(); + virtual void ReadOnlyLeaseDropped(); + // create transaction? protected: ITablet(TTabletStorageInfo *info, const TActorId &tablet) @@ -496,6 +500,10 @@ namespace NFlatExecutorSetup { virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0; + virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; + virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0; + virtual void ConfirmReadOnlyLease(std::function<void()> callback) = 0; + /* Make blob with data required for table bootstapping. Note: 1. Once non-trivial blob obtained and commited in tx all of its borrowed bundles have to be eventually released (see db). diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make index fd66d04202..d213ccbe49 100644 --- a/ydb/core/tablet_flat/ut/ya.make +++ b/ydb/core/tablet_flat/ut/ya.make @@ -22,6 +22,7 @@ SRCS( flat_executor_ut.cpp flat_executor_database_ut.cpp flat_executor_gclogic_ut.cpp + flat_executor_leases_ut.cpp flat_range_cache_ut.cpp flat_row_versions_ut.cpp flat_sausagecache_ut.cpp @@ -64,6 +65,7 @@ PEERDIR( ydb/core/scheme ydb/core/tablet_flat/test/libs/exec ydb/core/tablet_flat/test/libs/table + ydb/core/testlib ydb/library/yql/public/udf/service/exception_policy ) diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make index 6b1226bf01..1c3135c1eb 100644 --- a/ydb/core/tablet_flat/ya.make +++ b/ydb/core/tablet_flat/ya.make @@ -7,6 +7,7 @@ OWNER( SRCS( defs.h + flat_boot_lease.cpp flat_boot_misc.cpp flat_comp.cpp flat_comp_create.cpp diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 12e37f4d87..ff09f2df27 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -120,6 +120,9 @@ namespace NActors { node->LogSettings->MessagePrefix = " node " + ToString(nodeId); auto* nodeAppData = node->GetAppData<NKikimr::TAppData>(); + if (!UseRealThreads) { + nodeAppData->MonotonicTimeProvider = MonotonicTimeProvider; + } nodeAppData->DataShardExportFactory = app0->DataShardExportFactory; nodeAppData->DomainsInfo = app0->DomainsInfo; nodeAppData->ChannelProfiles = app0->ChannelProfiles; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 68438fc419..ccddfee1d1 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -207,9 +207,6 @@ namespace Tests { const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode; SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams); - CreateBootstrapTablets(); - SetupStorage(); - for (ui32 nodeIdx = 0; nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) { SetupDomainLocalService(nodeIdx); Runtime->GetAppData(nodeIdx).AuthConfig.MergeFrom(Settings->AuthConfig); @@ -223,6 +220,9 @@ namespace Tests { SetupConfigurators(nodeIdx); SetupProxies(nodeIdx); } + + CreateBootstrapTablets(); + SetupStorage(); } void TServer::SetupMessageBus(ui16 port, const TString &tracePath) { diff --git a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp index 8345d93485..e6a36f0007 100644 --- a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp +++ b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp @@ -96,11 +96,27 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE TEvAcquireReadStep"); + IncCounter(COUNTER_REQ_ACQUIRE_READ_STEP); + if (Y_UNLIKELY(Stopping)) { // We won't be able to commit anyway return; } + if (ReadOnlyLeaseEnabled()) { + // We acquire read step using a read-only lease from executor + // It is guaranteed that any future generation was not running at + // the time ConfirmReadOnlyLease was called. + TActorId sender = ev->Sender; + ui64 cookie = ev->Cookie; + ui64 step = Max(VolatileState.LastSentStep, VolatileState.LastAcquired); + VolatileState.LastAcquired = step; + Executor()->ConfirmReadOnlyLease([this, sender, cookie, step]() { + Send(sender, new TEvTxProxy::TEvAcquireReadStepResult(TabletID(), step), 0, cookie); + }); + return; + } + VolatileState.AcquireReadStepPending.emplace_back(ev->Sender, ev->Cookie); MaybeFlushAcquireReadStep(ctx); } diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp index 1fbc46f449..49aa706c77 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.cpp +++ b/ydb/core/tx/coordinator/coordinator_impl.cpp @@ -50,6 +50,8 @@ const ui32 TTxCoordinator::Schema::CurrentVersion = 1; TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) + , EnableLeaderLeases(0, 0, 1) + , MinLeaderLeaseDurationUs(250000, 1000, 5000000) #ifdef COORDINATOR_LOG_TO_FILE , DebugName(Sprintf("/tmp/coordinator_db_log_%" PRIu64 ".%" PRIi32 ".%" PRIu64 ".gz", TabletID(), getpid(), tablet.LocalId())) , DebugLogFile(DebugName) @@ -316,7 +318,28 @@ void TTxCoordinator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, cons } } +void TTxCoordinator::IcbRegister() { + if (!IcbRegistered) { + AppData()->Icb->RegisterSharedControl(EnableLeaderLeases, "CoordinatorControls.EnableLeaderLeases"); + AppData()->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "CoordinatorControls.MinLeaderLeaseDurationUs"); + IcbRegistered = true; + } +} + +bool TTxCoordinator::ReadOnlyLeaseEnabled() { + IcbRegister(); + ui64 value = EnableLeaderLeases; + return value != 0; +} + +TDuration TTxCoordinator::ReadOnlyLeaseDuration() { + IcbRegister(); + ui64 value = MinLeaderLeaseDurationUs; + return TDuration::MicroSeconds(value); +} + void TTxCoordinator::OnActivateExecutor(const TActorContext &ctx) { + IcbRegister(); TryInitMonCounters(ctx); Executor()->RegisterExternalTabletCounters(TabletCountersPtr); Execute(CreateTxSchema(), ctx); diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h index 28a817ae46..82f5f4b4d2 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.h +++ b/ydb/core/tx/coordinator/coordinator_impl.h @@ -8,6 +8,8 @@ #include <library/cpp/actors/helpers/mon_histogram_helper.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/tx_processing.h> +#include <ydb/core/control/immediate_control_board_wrapper.h> +#include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_exception.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -461,6 +463,10 @@ private: i64 CurrentTxInFly; }; + bool IcbRegistered = false; + TControlWrapper EnableLeaderLeases; + TControlWrapper MinLeaderLeaseDurationUs; + TVolatileState VolatileState; TConfig Config; TCoordinatorMonCounters MonCounters; @@ -503,6 +509,14 @@ private: return TActor::Die(ctx); } + void IcbRegister(); + bool ReadOnlyLeaseEnabled() override; + TDuration ReadOnlyLeaseDuration() override; + + void IncCounter(NFlatTxCoordinator::ECumulativeCounters counter, ui64 num = 1) { + TabletCounters->Cumulative()[counter].Increment(num); + } + void OnActivateExecutor(const TActorContext &ctx) override; void DefaultSignalTabletActive(const TActorContext &ctx) override { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 3a0de1dc81..71e28bfb17 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -137,6 +137,10 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , ReadColumnsScanInUserPool(0, 0, 1) , BackupReadAheadLo(0, 0, 64*1024*1024) , BackupReadAheadHi(0, 0, 128*1024*1024) + , EnablePrioritizedMvccSnapshotReads(0, 0, 1) + , EnableUnprotectedMvccSnapshotReads(0, 0, 1) + , EnableLeaderLeases(0, 0, 1) + , MinLeaderLeaseDurationUs(250000, 1000, 5000000) , DataShardSysTables(InitDataShardSysTables(this)) , ChangeSenderActivator(info->TabletID) , ChangeExchangeSplitter(this) @@ -262,6 +266,7 @@ void TDataShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorCo void TDataShard::Cleanup(const TActorContext& ctx) { //PipeClientCache->Detach(ctx); if (RegistrationSended) { + ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep()); ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnregisterTablet(TabletID())); } @@ -275,26 +280,54 @@ void TDataShard::Cleanup(const TActorContext& ctx) { } } -void TDataShard::OnActivateExecutor(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::OnActivateExecutor: tablet " << TabletID() << " actor " << ctx.SelfID); +void TDataShard::IcbRegister() { + if (!IcbRegistered) { + auto* appData = AppData(); + + appData->Icb->RegisterSharedControl(DisableByKeyFilter, "DataShardControls.DisableByKeyFilter"); + appData->Icb->RegisterSharedControl(MaxTxInFly, "DataShardControls.MaxTxInFly"); + appData->Icb->RegisterSharedControl(MaxTxLagMilliseconds, "DataShardControls.MaxTxLagMilliseconds"); + appData->Icb->RegisterSharedControl(DataTxProfileLogThresholdMs, "DataShardControls.DataTxProfile.LogThresholdMs"); + appData->Icb->RegisterSharedControl(DataTxProfileBufferThresholdMs, "DataShardControls.DataTxProfile.BufferThresholdMs"); + appData->Icb->RegisterSharedControl(DataTxProfileBufferSize, "DataShardControls.DataTxProfile.BufferSize"); + + appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets"); + appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit"); + appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent"); + appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds"); - AppData(ctx)->Icb->RegisterSharedControl(DisableByKeyFilter, "DataShardControls.DisableByKeyFilter"); - AppData(ctx)->Icb->RegisterSharedControl(MaxTxInFly, "DataShardControls.MaxTxInFly"); - AppData(ctx)->Icb->RegisterSharedControl(MaxTxLagMilliseconds, "DataShardControls.MaxTxLagMilliseconds"); - AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileLogThresholdMs, "DataShardControls.DataTxProfile.LogThresholdMs"); - AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileBufferThresholdMs, "DataShardControls.DataTxProfile.BufferThresholdMs"); - AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileBufferSize, "DataShardControls.DataTxProfile.BufferSize"); + appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled"); + appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool"); - AppData(ctx)->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets"); - AppData(ctx)->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit"); - AppData(ctx)->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent"); - AppData(ctx)->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds"); + appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo"); + appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi"); - AppData(ctx)->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled"); - AppData(ctx)->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool"); + appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads"); + appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads"); + + appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases"); + appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs"); + + IcbRegistered = true; + } +} + +bool TDataShard::ReadOnlyLeaseEnabled() { + IcbRegister(); + ui64 value = EnableLeaderLeases; + return value != 0; +} + +TDuration TDataShard::ReadOnlyLeaseDuration() { + IcbRegister(); + ui64 value = MinLeaderLeaseDurationUs; + return TDuration::MicroSeconds(value); +} + +void TDataShard::OnActivateExecutor(const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::OnActivateExecutor: tablet " << TabletID() << " actor " << ctx.SelfID); - AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo"); - AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi"); + IcbRegister(); // OnActivateExecutor might be called multiple times for a follower // but the counters should be initialized only once @@ -319,6 +352,16 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) { } void TDataShard::SwitchToWork(const TActorContext &ctx) { + if (IsMvccEnabled() && ( + SnapshotManager.GetPerformedUnprotectedReads() || + SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step)) + { + // We will need to wait until mediator state is fully restored before + // processing new immediate transactions. + MediatorStateWaiting = true; + CheckMediatorStateRestored(); + } + SyncConfig(); PlanQueue.Progress(ctx); OutReadSets.ResendAll(ctx); @@ -355,10 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) { LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast " << DatashardStateName(State) << " tabletId " << TabletID() << " mediators count is " << ProcessingParams->MediatorsSize() + << " coordinators count is " << ProcessingParams->CoordinatorsSize() << " buckets per mediator " << ProcessingParams->GetTimeCastBucketsPerMediator()); RegistrationSended = true; ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams)); + + // Subscribe to all known coordinators + for (ui64 coordinatorId : ProcessingParams->GetCoordinators()) { + size_t index = CoordinatorSubscriptions.size(); + auto res = CoordinatorSubscriptionById.emplace(coordinatorId, index); + if (res.second) { + auto& subscription = CoordinatorSubscriptions.emplace_back(); + subscription.CoordinatorId = coordinatorId; + ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvSubscribeReadStep(coordinatorId)); + ++CoordinatorSubscriptionsPending; + } + } } void TDataShard::PrepareAndSaveOutReadSets(ui64 step, @@ -1194,7 +1250,8 @@ TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const { if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId)) return TRowVersion(nextOp->GetStep(), nextOp->GetTxId()); - return TRowVersion((++edge).Step, ::Max<ui64>()); + TRowVersion candidate = TRowVersion((++edge).Step, ::Max<ui64>()); + return Max(candidate, SnapshotManager.GetImmediateWriteEdge()); } TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const { @@ -1218,6 +1275,9 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const // With read-only transactions we don't need reads to include // changes made at the incomplete edge, as that is a point where // distributed transactions performed some reads, not writes. + // Since incomplete transactions are still inflight, the actual + // version will stick to the first incomplete transaction is queue, + // effectively reading non-repeatable state before that transaction. edge = readEdge; break; case EMvccTxMode::ReadWrite: @@ -1235,12 +1295,41 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId)) return TRowVersion(nextOp->GetStep(), nextOp->GetTxId()); - // This is currently active step for immediate writes, not that when - // writeEdge is equal to some (PlanStep, Max<ui64>()) that means everything - // up to this point is "fixed" and cannot be changed. In that case we - // choose at least PlanStep + 1 for new writes. - ui64 writeStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, (++writeEdge).Step); - return TRowVersion(writeStep, ::Max<ui64>()); + // Normally we stick transactions to the end of the last known mediator step + // Note this calculations only happen when we don't have distributed + // transactions left in queue, and we won't have any more transactions + // up to the current mediator time. The mediator time itself may be stale, + // in which case we may have evidence of its higher value via complete and + // incomplete edges above. + const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step); + TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>()); + + switch (mode) { + case EMvccTxMode::ReadOnly: { + // We want to include everything that was potentially confirmed to + // users, but we don't want to include anything that is not replied + // at the start of this read. + // Note it's only possible to have ImmediateWriteEdge > mediatorEdge + // when ImmediateWriteEdge == mediatorEdge + 1 + return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied(), SnapshotManager.GetUnprotectedReadEdge()); + } + + case EMvccTxMode::ReadWrite: { + // We must use at least a previously used immediate write edge + // But we must also avoid trumpling over any unprotected mvcc + // snapshot reads that have occurred. + // Note it's only possible to go past the last known mediator step + // is when we had an unprotected read, which itself happens at the + // last mediator step. So we may only ever have a +1 step, never + // anything more. + TRowVersion postReadEdge = SnapshotManager.GetPerformedUnprotectedReads() + ? SnapshotManager.GetUnprotectedReadEdge().Next() + : TRowVersion::Min(); + return Max(mediatorEdge, writeEdge.Next(), postReadEdge, SnapshotManager.GetImmediateWriteEdge()); + } + } + + Y_FAIL("unreachable"); } TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { @@ -1260,6 +1349,239 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { return mvccVersion; } +TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges( + const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc) +{ + TPromotePostExecuteEdges res; + + res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc); + + switch (mode) { + case EPromotePostExecuteEdges::ReadOnly: + // We want read-only immediate transactions to be readonly, thus + // don't promote the complete edge unnecessarily. On restarts we + // will assume anything written is potentially replied anyway, + // even if it has never been read. + break; + + case EPromotePostExecuteEdges::RepeatableRead: { + bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads(); + if (unprotectedReads) { + // We want to use unprotected reads, but we need to make sure it's properly marked first + if (!SnapshotManager.GetPerformedUnprotectedReads()) { + SnapshotManager.SetPerformedUnprotectedReads(true, txc); + res.HadWrites = true; + } + if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) { + // We need to wait for completion until the flag is committed + res.WaitCompletion = true; + } + SnapshotManager.PromoteUnprotectedReadEdge(version); + } else if (SnapshotManager.GetPerformedUnprotectedReads()) { + // We want to drop the flag as soon as possible + SnapshotManager.SetPerformedUnprotectedReads(false, txc); + res.HadWrites = true; + } + + // We want to promote the complete edge when protected reads are + // used or when we're already writing something anyway. + if (res.HadWrites || !unprotectedReads) { + res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc); + if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) { + // We need to wait for completion because some other transaction + // has moved complete edge, but it's not committed yet. + res.WaitCompletion = true; + } + } + + break; + } + + case EPromotePostExecuteEdges::ReadWrite: { + if (version.Step <= GetMaxObservedStep()) { + res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc); + } + res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc); + break; + } + } + + return res; +} + +ui64 TDataShard::GetMaxObservedStep() const { + return Max( + Pipeline.GetLastPlannedTx().Step, + SnapshotManager.GetCompleteEdge().Step, + SnapshotManager.GetIncompleteEdge().Step, + SnapshotManager.GetUnprotectedReadEdge().Step, + MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0); +} + +void TDataShard::SendImmediateWriteResult( + const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie) +{ + const ui64 step = version.Step; + const ui64 observedStep = GetMaxObservedStep(); + if (step <= observedStep) { + SnapshotManager.PromoteImmediateWriteEdgeReplied(version); + Send(target, event, 0, cookie); + return; + } + + MediatorDelayedReplies.emplace( + std::piecewise_construct, + std::forward_as_tuple(version), + std::forward_as_tuple(target, THolder<IEventBase>(event), cookie)); + + // Try to subscribe to the next step, when needed + if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) { + MediatorTimeCastWaitingSteps.insert(step); + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } +} + +void TDataShard::SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie) { + if (IsFollower() || !ReadOnlyLeaseEnabled()) { + // We just send possibly stale result (old behavior) + Send(target, event, 0, cookie); + return; + } + + struct TSendState : public TThrRefBase { + TActorId Target; + THolder<IEventBase> Event; + ui64 Cookie; + + TSendState(const TActorId& target, IEventBase* event, ui64 cookie) + : Target(target) + , Event(event) + , Cookie(cookie) + { } + }; + + if (!readTime) { + readTime = AppData()->MonotonicTimeProvider->Now(); + } + + Executor()->ConfirmReadOnlyLease(readTime, [this, state = MakeIntrusive<TSendState>(target, event, cookie)] { + Send(state->Target, state->Event.Release(), 0, state->Cookie); + }); +} + +void TDataShard::SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie) { + SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie); +} + +void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) { + for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) { + const ui64 step = it->first.Step; + + if (step <= mediatorStep) { + SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first); + Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie); + it = MediatorDelayedReplies.erase(it); + continue; + } + + // Try to subscribe to the next step, when needed + if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) { + MediatorTimeCastWaitingSteps.insert(step); + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } + break; + } +} + +void TDataShard::CheckMediatorStateRestored() { + if (!MediatorStateWaiting || + !RegistrationSended || + !MediatorTimeCastEntry || + CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>()) + { + // We are not waiting or not ready to make a decision + if (MediatorStateWaiting && + MediatorTimeCastEntry && + CoordinatorPrevReadStepMax == Max<ui64>() && + !MediatorStateBackupInitiated) + { + // It is possible we don't have coordinators with new protocol support + // Use a backup plan of acquiring a read snapshot for restoring the read step + Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup); + MediatorStateBackupInitiated = true; + } + return; + } + + // CoordinatorPrevReadStepMax shows us what is the next minimum step that + // may be acquired as a snapshot. This tells as that no previous read + // could have happened after this step, even if it has been acquired. + // CoordinatorPrevReadStepMin shows us the maximum step that could have + // been acquired before we subscribed. Even if the next step is very + // large it may be used to infer an erlier step, as previous generation + // could not have read any step that was not acquired. + // When some coordinators are still pending we use CoordinatorPrevReadStepMax + // as a worst case read step in the future, hoping to make a tighter + // prediction while we wait for that. + // Note we always need to wait for CoordinatorPrevReadStepMax because + // previous generation may have observed it and may have replied to + // immediate writes at that step. + const ui64 waitStep = CoordinatorPrevReadStepMax; + const ui64 readStep = CoordinatorSubscriptionsPending == 0 + ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin) + : CoordinatorPrevReadStepMax; + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep); + + // WARNING: we must perform this check BEFORE we update unprotected read edge + // We may enter this code path multiple times, and we expect that the above + // read step may be refined while we wait based on pessimistic backup step. + if (GetMaxObservedStep() < waitStep) { + // We need to wait until we observe mediator step that is at least + // as large as the step we found. + if (MediatorTimeCastWaitingSteps.insert(waitStep).second) { + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), waitStep)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << waitStep << " from mediator time cast"); + } + return; + } + + // Using the inferred last read step we restore the pessimistic unprotected + // read edge. Note we only need to do so if there have actually been any + // unprotected reads in this datashard history. We also need to make sure + // this edge is at least one smaller than ImmediateWriteEdge when we know + // we started unconfirmed immediate writes in the last generation. + if (SnapshotManager.GetPerformedUnprotectedReads()) { + const TRowVersion lastReadEdge(readStep, Max<ui64>()); + const TRowVersion preImmediateWriteEdge = + SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step + ? SnapshotManager.GetImmediateWriteEdge().Prev() + : TRowVersion::Min(); + SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge)); + } + + // Promote the replied immediate write edge up to the currently observed step + // This is needed to make sure we read any potentially replied immediate + // writes before the restart, and conversely don't accidentally read any + // data that is definitely not replied yet. + if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) { + const TRowVersion edge(GetMaxObservedStep(), Max<ui64>()); + SnapshotManager.PromoteImmediateWriteEdgeReplied( + Min(edge, SnapshotManager.GetImmediateWriteEdge())); + } + + MediatorStateWaiting = false; + + // Resend all waiting messages + TVector<THolder<IEventHandle>> msgs; + msgs.swap(MediatorStateWaitingMsgs); + for (auto& ev : msgs) { + TActivationContext::Send(ev.Release()); + } +} + NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code) { using EResult = NMiniKQL::IEngineFlat::EResult; @@ -1410,7 +1732,7 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, rejectReasons.push_back("decided to reject due to given RejectProbability"); } - size_t totalInFly = (TxInFly() + ImmediateInFly() + ProposeQueue.Size() + TxWaiting()); + size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting()); if (totalInFly > GetMaxTxInFly()) { reject = true; rejectReasons.push_back("MaxTxInFly was exceeded"); @@ -1481,10 +1803,21 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* } void TDataShard::UpdateProposeQueueSize() const { - SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs()); + SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs()); } void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { + // Check if we need to delay an immediate transaction + if (MediatorStateWaiting && + (ev->Get()->GetFlags() & TTxFlags::Immediate) && + !(ev->Get()->GetFlags() & TTxFlags::ForceOnline)) + { + // We cannot calculate correct version until we restore mediator state + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } + if (Pipeline.HasProposeDelayers()) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored"); @@ -1897,7 +2230,30 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, MediatorTimeCastEntry = ev->Get()->Entry; Y_VERIFY(MediatorTimeCastEntry); + SendAfterMediatorStepActivate(MediatorTimeCastEntry->Get(TabletID())); + Pipeline.ActivateWaitingTxOps(ctx); + + CheckMediatorStateRestored(); +} + +void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx) { + auto* msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID() + << " coordinator " << msg->CoordinatorId + << " last step " << msg->LastReadStep + << " next step " << msg->ReadStep->Get()); + auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId); + Y_VERIFY_S(it != CoordinatorSubscriptionById.end(), + "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId); + size_t index = it->second; + auto& subscription = CoordinatorSubscriptions.at(index); + subscription.ReadStep = msg->ReadStep; + CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep); + CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep); + --CoordinatorSubscriptionsPending; + CheckMediatorStateRestored(); } void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx) { @@ -1911,13 +2267,27 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end() && *it <= step;) it = MediatorTimeCastWaitingSteps.erase(it); + SendAfterMediatorStepActivate(step); + Pipeline.ActivateWaitingTxOps(ctx); + + CheckMediatorStateRestored(); +} + +void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) { + if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) { + // We are still waiting for new protol coordinator state + // TODO: send an old snapshot request to coordinators + } } bool TDataShard::WaitPlanStep(ui64 step) { if (step <= Pipeline.GetLastPlannedTx().Step) return false; + if (step <= SnapshotManager.GetCompleteEdge().Step) + return false; + if (MediatorTimeCastEntry && step <= MediatorTimeCastEntry->Get(TabletID())) return false; @@ -1943,7 +2313,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr auto &rec = ev->Get()->Record; if (rec.HasMvccSnapshot()) { TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId()); - TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(); + TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads()); if (rowVersion >= unreadableEdge) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge); return true; diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 3b3c9d9d6f..c55cf209f5 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -132,6 +132,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c } void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx) { + if (MediatorStateWaiting) { + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) { Executor()->Execute(new TTxUploadRows(this, ev), ctx); } else { @@ -140,6 +145,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct } void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx) { + if (MediatorStateWaiting) { + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) { Executor()->Execute(new TTxEraseRows(this, ev), ctx); } else { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index c5d2bf68c6..4128209133 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -209,6 +209,21 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } template <typename TEvRequest, typename TEvResponse> +void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie) { + Y_VERIFY(Result); + + if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) { + self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS); + } else { + self->IncCounter(COUNTER_BULK_UPSERT_ERROR); + } + + target = Ev->Sender; + event = std::move(Result); + cookie = 0; +} + +template <typename TEvRequest, typename TEvResponse> void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) { Y_VERIFY(Result); diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 262027b14f..928558de2d 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -23,6 +23,7 @@ public: protected: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion); + void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie); void SendResult(TDataShard* self, const TActorContext& ctx); TVector<IChangeCollector::TChange> GetCollectedChanges() const; diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 826819596f..574c5ff647 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -204,7 +204,7 @@ bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, return true; } -void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) { +TDirectTxResult TDirectTxErase::GetResult(TDataShard* self) { Y_VERIFY(Result); if (Result->Record.GetStatus() == NKikimrTxDataShard::TEvEraseRowsResponse::OK) { @@ -213,7 +213,11 @@ void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) { self->IncCounter(COUNTER_ERASE_ROWS_ERROR); } - ctx.Send(Ev->Sender, std::move(Result)); + TDirectTxResult res; + res.Target = Ev->Sender; + res.Event = std::move(Result); + res.Cookie = 0; + return res; } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxErase::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index 689b868a36..7663b47d38 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -66,7 +66,7 @@ public: NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error); bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; - void SendResult(TDataShard* self, const TActorContext& ctx) override; + TDirectTxResult GetResult(TDataShard* self) override; TVector<IChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp index 145f0936cd..858247f36a 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp @@ -31,16 +31,25 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded) } bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) { - auto [readVersion, writeVersion] = self->GetReadWriteVersions(); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); if (!Impl->Execute(self, txc, readVersion, writeVersion)) return false; - self->PromoteCompleteEdge(writeVersion.Step, txc); + if (self->IsMvccEnabled()) { + // Note: we always wait for completion, so we can ignore the result + self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + } + return true; } void TDirectTransaction::SendResult(TDataShard* self, const TActorContext& ctx) { - Impl->SendResult(self, ctx); + auto result = Impl->GetResult(self); + if (MvccReadWriteVersion) { + self->SendImmediateWriteResult(*MvccReadWriteVersion, result.Target, result.Event.Release(), result.Cookie); + } else { + ctx.Send(result.Target, result.Event.Release(), 0, result.Cookie); + } } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTransaction::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h index 5b42e1f1d7..e4d83188d5 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.h +++ b/ydb/core/tx/datashard/datashard_direct_transaction.h @@ -11,11 +11,17 @@ namespace NKikimr { namespace NDataShard { +struct TDirectTxResult { + TActorId Target; + THolder<IEventBase> Event; + ui64 Cookie; +}; + class IDirectTx { public: virtual ~IDirectTx() = default; virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0; - virtual void SendResult(TDataShard* self, const TActorContext& ctx) = 0; + virtual TDirectTxResult GetResult(TDataShard* self) = 0; virtual TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const = 0; }; diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp index 6173c0e218..abb3eda763 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.cpp +++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp @@ -12,8 +12,10 @@ bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion); } -void TDirectTxUpload::SendResult(TDataShard* self, const TActorContext& ctx) { - TCommonUploadOps::SendResult(self, ctx); +TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) { + TDirectTxResult res; + TCommonUploadOps::GetResult(self, res.Target, res.Event, res.Cookie); + return res; } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxUpload::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index 7ca84f19a7..a26226bd79 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -14,7 +14,7 @@ public: explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev); bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; - void SendResult(TDataShard* self, const TActorContext& ctx) override; + TDirectTxResult GetResult(TDataShard* self) override; TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b5a5abfa6f..e5934ac2be 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -295,6 +295,7 @@ class TDataShard EvRequestChangeRecords, EvRemoveChangeRecords, EvReplicationSourceOffsets, + EvMediatorRestoreBackup, EvEnd }; @@ -434,6 +435,8 @@ class TDataShard // Note that keys are NOT sorted in any way THashMap<TString, TVector<TSplitKey>> SourceOffsets; }; + + struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {}; }; struct Schema : NIceDb::Schema { @@ -791,6 +794,10 @@ class TDataShard Sys_NextChangeRecordOrder, // 36 Next order of change record Sys_LastChangeRecordGroup, // 37 Last group number of change records + SysMvcc_UnprotectedReads, // 38 Shard may have performed unprotected mvcc reads when non-zero + SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled + SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled + // reserved SysPipeline_Flags = 1000, SysPipeline_LimitActiveTx, @@ -800,6 +807,9 @@ class TDataShard static_assert(ESysTableKeys::Sys_SubDomainOwnerId == 33, "Sys_SubDomainOwnerId changed its value"); static_assert(ESysTableKeys::Sys_SubDomainLocalPathId == 34, "Sys_SubDomainLocalPathId changed its value"); static_assert(ESysTableKeys::Sys_SubDomainOutOfSpace == 35, "Sys_SubDomainOutOfSpace changed its value"); + static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value"); + static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value"); + static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value"); static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000 @@ -890,7 +900,9 @@ class TDataShard void Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvReturnBorrowedPart::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvReturnBorrowedPartAck::TPtr& ev, const TActorContext& ctx); @@ -1001,6 +1013,9 @@ class TDataShard void OnStopGuardStarting(const TActorContext &ctx); void OnStopGuardComplete(const TActorContext &ctx); void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) override; + void IcbRegister(); + bool ReadOnlyLeaseEnabled() override; + TDuration ReadOnlyLeaseDuration() override; void OnActivateExecutor(const TActorContext &ctx) override; void Cleanup(const TActorContext &ctx); @@ -1214,6 +1229,16 @@ public: return BackupReadAheadHi; } + bool GetEnablePrioritizedMvccSnapshotReads() const { + ui64 value = EnablePrioritizedMvccSnapshotReads; + return value != 0; + } + + bool GetEnableUnprotectedMvccSnapshotReads() const { + ui64 value = EnableUnprotectedMvccSnapshotReads; + return value != 0; + } + template <typename T> void ReleaseCache(T& tx) { ReleaseTxCache(tx.GetTxCacheUsage()); @@ -1400,7 +1425,28 @@ public: // Returns a suitable row version for performing a transaction TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const; + enum class EPromotePostExecuteEdges { + ReadOnly, + RepeatableRead, + ReadWrite, + }; + + struct TPromotePostExecuteEdges { + bool HadWrites = false; + bool WaitCompletion = false; + }; + TReadWriteVersions GetReadWriteVersions(TOperation* op = nullptr) const; + TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges( + const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc); + ui64 GetMaxObservedStep() const; + void SendImmediateWriteResult( + const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0); + void SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie = 0); + void SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie = 0); + void SendAfterMediatorStepActivate(ui64 mediatorStep); + + void CheckMediatorStateRestored(); void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const; @@ -1948,8 +1994,38 @@ private: TS3UploadsManager S3Uploads; TS3DownloadsManager S3Downloads; + struct TMediatorDelayedReply { + TActorId Target; + THolder<IEventBase> Event; + ui64 Cookie; + + TMediatorDelayedReply(const TActorId& target, THolder<IEventBase> event, ui64 cookie) + : Target(target) + , Event(std::move(event)) + , Cookie(cookie) + { } + }; + TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; TSet<ui64> MediatorTimeCastWaitingSteps; + TMultiMap<TRowVersion, TMediatorDelayedReply> MediatorDelayedReplies; + + struct TCoordinatorSubscription { + ui64 CoordinatorId; + TMediatorTimecastReadStep::TCPtr ReadStep; + }; + + TVector<TCoordinatorSubscription> CoordinatorSubscriptions; + THashMap<ui64, size_t> CoordinatorSubscriptionById; + size_t CoordinatorSubscriptionsPending = 0; + ui64 CoordinatorPrevReadStepMin = 0; + ui64 CoordinatorPrevReadStepMax = Max<ui64>(); + + TVector<THolder<IEventHandle>> MediatorStateWaitingMsgs; + bool MediatorStateWaiting = false; + bool MediatorStateBackupInitiated = false; + + bool IcbRegistered = false; TControlWrapper DisableByKeyFilter; TControlWrapper MaxTxInFly; @@ -1969,6 +2045,12 @@ private: TControlWrapper BackupReadAheadLo; TControlWrapper BackupReadAheadHi; + TControlWrapper EnablePrioritizedMvccSnapshotReads; + TControlWrapper EnableUnprotectedMvccSnapshotReads; + + TControlWrapper EnableLeaderLeases; + TControlWrapper MinLeaderLeaseDurationUs; + // Set of InRS keys to remove from local DB. THashSet<TReadSetKey> InRSToRemove; TIntrusivePtr<TThrRefBase> DataShardSysTables; @@ -2093,6 +2175,9 @@ protected: TRACE_EVENT(NKikimrServices::TX_DATASHARD); switch (ev->GetTypeRewrite()) { HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); + HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvents::TEvPoisonPill, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { @@ -2142,7 +2227,9 @@ protected: HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle); HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle); HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle); HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); + HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvDataShard::TEvCancelTransactionProposal, Handle); HFuncTraced(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); HFunc(TEvDataShard::TEvReturnBorrowedPart, Handle); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 3498162c17..faf90e403b 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1581,19 +1581,21 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co if (Self->MvccSwitchState == TSwitchState::SWITCHING) { WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished } else { + bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); Y_VERIFY_DEBUG(ev->Get()->Record.HasMvccSnapshot()); TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId()); WaitingDataTxOps.emplace(snapshot, std::move(ev)); + const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1; TRowVersion unreadableEdge; - if (!Self->WaitPlanStep(snapshot.Step + 1) && snapshot < (unreadableEdge = GetUnreadableEdge())) { - ActivateWaitingTxOps(unreadableEdge, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op + if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) { + ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op } } return true; } -void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) { +void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) { if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING) return; @@ -1611,8 +1613,12 @@ void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) activated = true; } - if (minWait == TRowVersion::Max() || Self->WaitPlanStep(minWait.Step + 1) || minWait >= (edge = GetUnreadableEdge())) + if (minWait == TRowVersion::Max() || + Self->WaitPlanStep(prioritizedReads ? minWait.Step : minWait.Step + 1) || + minWait >= (edge = GetUnreadableEdge(prioritizedReads))) + { break; + } // Async MediatorTimeCastEntry update, need to rerun activation } @@ -1626,7 +1632,8 @@ void TPipeline::ActivateWaitingTxOps(const TActorContext& ctx) { if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING) return; - ActivateWaitingTxOps(GetUnreadableEdge(), ctx); + bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); + ActivateWaitingTxOps(GetUnreadableEdge(prioritizedReads), prioritizedReads, ctx); } TRowVersion TPipeline::GetReadEdge() const { @@ -1645,29 +1652,51 @@ TRowVersion TPipeline::GetReadEdge() const { return TRowVersion(step, Max<ui64>()); } -TRowVersion TPipeline::GetUnreadableEdge() const { - auto last = TRowVersion( +TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const { + const auto last = TRowVersion( GetLastActivePlannedOpStep(), GetLastActivePlannedOpId()); auto it = Self->TransQueue.PlannedTxs.upper_bound(TStepOrder(last.Step, last.TxId)); while (it != Self->TransQueue.PlannedTxs.end()) { - last = TRowVersion(it->Step, it->TxId); - if (!Self->TransQueue.FindTxInFly(last.TxId)->IsReadOnly()) { + const auto next = TRowVersion(it->Step, it->TxId); + if (!Self->TransQueue.FindTxInFly(next.TxId)->IsReadOnly()) { // If there's any non-read-only planned tx we don't have in the // dependency tracker yet, we absolutely cannot read from that // version. - return last; + return next; } ++it; } - // It looks like we have an empty plan queue (or it's read-only), so we - // use a rough estimate of a point we would use for immediate writes in - // the far future. That point in time is not complete yet and cannot be - // used for snapshot reads. - last = TRowVersion(LastPlannedTx.Step, LastPlannedTx.TxId); - ui64 step = Max(Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, (++last).Step); - return TRowVersion(step, Max<ui64>()); + // It looks like we have an empty plan queue (or it's read-only), so we use + // a rough estimate of a point in time we would use for immediate writes + // in the distant future. That point in time possibly has some unfinished + // transactions, but they would be resolved using dependency tracker. Here + // we use an estimate of the observed mediator step (including possible past + // generations). Note that we also update CompleteEdge when the distributed + // queue is empty, but we have been performing immediate writes and thus + // observing an updated mediator timecast step. + const ui64 mediatorStep = Max( + Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, + Self->SnapshotManager.GetIncompleteEdge().Step, + Self->SnapshotManager.GetCompleteEdge().Step, + LastPlannedTx.Step); + + // Using an observed mediator step we conclude that we have observed all + // distributed transactions up to the end of that step. + const TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>()); + + if (prioritizeReads) { + // We are prioritizing reads, and we are ok with blocking immediate writes + // in the current step. So the first unreadable version is actually in + // the next step. + return mediatorEdge.Next(); + } else { + // We cannot block immediate writes up to this edge, thus we actually + // need to wait until the edge progresses above this version. This + // would happen when mediator timecast moves to the next step. + return mediatorEdge; + } } void TPipeline::AddCompletingOp(const TOperation::TPtr& op) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 67c35260b2..2d0d76b100 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -329,11 +329,11 @@ public: ui64 WaitingTxs() const { return WaitingDataTxOps.size(); } bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); - void ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx); + void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx); void ActivateWaitingTxOps(const TActorContext& ctx); TRowVersion GetReadEdge() const; - TRowVersion GetUnreadableEdge() const; + TRowVersion GetUnreadableEdge(bool prioritizedReads) const; void AddCompletingOp(const TOperation::TPtr& op); void RemoveCompletingOp(const TOperation::TPtr& op); diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index da1219bb9e..39426396f3 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -30,9 +30,11 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { TRowVersion completeEdge = TRowVersion::Min(); TRowVersion incompleteEdge = TRowVersion::Min(); TRowVersion lowWatermark = TRowVersion::Min(); + TRowVersion immediateWriteEdge = TRowVersion::Min(); ui32 mvccState = 0; ui64 keepSnapshotTimeout = 0; + ui64 unprotectedReads = 0; TSnapshotMap snapshots; @@ -43,12 +45,15 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { // We don't currently support mvcc on the follower ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState); ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads); ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId); ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId); ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId); } { @@ -86,9 +91,19 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { MinWriteVersion = minWriteVersion; MvccState = static_cast<EMvccState>(mvccState); KeepSnapshotTimeout = keepSnapshotTimeout; + PerformedUnprotectedReads = (unprotectedReads != 0); CompleteEdge = completeEdge; IncompleteEdge = incompleteEdge; LowWatermark = lowWatermark; + ImmediateWriteEdge = immediateWriteEdge; + if (ImmediateWriteEdge.Step <= Max(CompleteEdge.Step, IncompleteEdge.Step)) { + ImmediateWriteEdgeReplied = immediateWriteEdge; + } else { + // We cannot be sure which writes we have replied to + // Datashard will restore mediator state and decide + ImmediateWriteEdgeReplied.Step = Max(CompleteEdge.Step, IncompleteEdge.Step); + ImmediateWriteEdgeReplied.TxId = Max<ui64>(); + } CommittedCompleteEdge = completeEdge; Snapshots = std::move(snapshots); } @@ -208,6 +223,82 @@ bool TSnapshotManager::PromoteIncompleteEdge(TOperation* op, TTransactionContext return false; } +TRowVersion TSnapshotManager::GetImmediateWriteEdge() const { + return ImmediateWriteEdge; +} + +TRowVersion TSnapshotManager::GetImmediateWriteEdgeReplied() const { + return ImmediateWriteEdgeReplied; +} + +void TSnapshotManager::SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) { + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeStep, version.Step); + Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, version.TxId); + ImmediateWriteEdge = version; +} + +bool TSnapshotManager::PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) { + if (!IsMvccEnabled()) + return false; + + if (version > ImmediateWriteEdge) { + SetImmediateWriteEdge(version, txc); + + return true; + } + + return false; +} + +bool TSnapshotManager::PromoteImmediateWriteEdgeReplied(const TRowVersion& version) { + if (!IsMvccEnabled()) + return false; + + if (version > ImmediateWriteEdgeReplied) { + ImmediateWriteEdgeReplied = version; + return true; + } + + return false; +} + +TRowVersion TSnapshotManager::GetUnprotectedReadEdge() const { + return UnprotectedReadEdge; +} + +bool TSnapshotManager::PromoteUnprotectedReadEdge(const TRowVersion& version) { + if (IsMvccEnabled() && UnprotectedReadEdge < version) { + UnprotectedReadEdge = version; + return true; + } + + return false; +} + +bool TSnapshotManager::GetPerformedUnprotectedReads() const { + return PerformedUnprotectedReads; +} + +bool TSnapshotManager::IsPerformedUnprotectedReadsCommitted() const { + return PerformedUnprotectedReadsUncommitted != 0; +} + +void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc) { + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_UnprotectedReads, ui64(performedUnprotectedReads ? 1 : 0)); + PerformedUnprotectedReads = performedUnprotectedReads; + PerformedUnprotectedReadsUncommitted++; + + txc.OnCommitted([this] { + this->PerformedUnprotectedReadsUncommitted--; + }); +} + void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) { using Schema = TDataShard::Schema; @@ -275,7 +366,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext const TRowVersion opVersion(step, txId); // We need to choose a version that is at least as large as all previous edges - TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge); + TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge, ImmediateWriteEdge); // This must be a version that we may have previously written to, and which // must not be a snapshot. We don't know if there have been any immediate @@ -300,7 +391,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext SetCompleteEdge(nicedb, nextVersion); SetIncompleteEdge(nicedb, nextVersion); + SetImmediateWriteEdge(nextVersion, txc); SetLowWatermark(nicedb, nextVersion); + ImmediateWriteEdgeReplied = ImmediateWriteEdge; break; } @@ -316,7 +409,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext const auto minVersion = TRowVersion::Min(); SetCompleteEdge(nicedb, minVersion); SetIncompleteEdge(nicedb, minVersion); + SetImmediateWriteEdge(minVersion, txc); SetLowWatermark(nicedb, minVersion); + ImmediateWriteEdgeReplied = ImmediateWriteEdge; break; } diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h index 8472c5e274..c6a1c8d1f3 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.h +++ b/ydb/core/tx/datashard/datashard_snapshots.h @@ -172,6 +172,19 @@ public: bool PromoteIncompleteEdge(TOperation* op, TTransactionContext& txc); + TRowVersion GetImmediateWriteEdge() const; + TRowVersion GetImmediateWriteEdgeReplied() const; + void SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc); + bool PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc); + bool PromoteImmediateWriteEdgeReplied(const TRowVersion& version); + + TRowVersion GetUnprotectedReadEdge() const; + bool PromoteUnprotectedReadEdge(const TRowVersion& version); + + bool GetPerformedUnprotectedReads() const; + bool IsPerformedUnprotectedReadsCommitted() const; + void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc); + EMvccState GetMvccState() const { return MvccState; } @@ -244,9 +257,14 @@ private: EMvccState MvccState = EMvccState::MvccUnspecified; ui64 KeepSnapshotTimeout = 0; + bool PerformedUnprotectedReads = false; + ui64 PerformedUnprotectedReadsUncommitted = 0; TRowVersion IncompleteEdge = TRowVersion::Min(); TRowVersion CompleteEdge = TRowVersion::Min(); TRowVersion LowWatermark = TRowVersion::Min(); + TRowVersion ImmediateWriteEdge = TRowVersion::Min(); + TRowVersion ImmediateWriteEdgeReplied = TRowVersion::Min(); + TRowVersion UnprotectedReadEdge = TRowVersion::Min(); TRowVersion CommittedCompleteEdge = TRowVersion::Min(); diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 4a9ab00878..ce6a08be45 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -282,6 +282,9 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_UNSAFE_UPLOAD_ROWS; } + +private: + TRowVersion MvccVersion = TRowVersion::Min(); }; class TDataShard::TTxExecuteMvccStateChange: public NTabletFlatExecutor::TTransactionBase<TDataShard> { diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp index 273952a06d..cb085ad480 100644 --- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp +++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp @@ -14,12 +14,26 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) return false; - Self->PromoteCompleteEdge(writeVersion.Step, txc); + if (Self->IsMvccEnabled()) { + // Note: we always wait for completion, so we can ignore the result + Self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + MvccVersion = writeVersion; + } + return true; } void TDataShard::TTxUnsafeUploadRows::Complete(const TActorContext& ctx) { - TCommonUploadOps::SendResult(Self, ctx); + TActorId target; + THolder<IEventBase> event; + ui64 cookie; + TCommonUploadOps::GetResult(Self, target, event, cookie); + + if (MvccVersion) { + Self->SendImmediateWriteResult(MvccVersion, target, event.Release(), cookie); + } else { + ctx.Send(target, event.Release(), 0, cookie); + } } } // NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index d2407cbf60..33197f23be 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1748,6 +1748,10 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId) { void SimulateSleep(Tests::TServer::TPtr server, TDuration duration) { auto &runtime = *server->GetRuntime(); + SimulateSleep(runtime, duration); +} + +void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) { auto sender = runtime.AllocateEdgeActor(); runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration); runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 9ee4a455a8..e72cf689fe 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -610,6 +610,7 @@ void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId) void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId); void SimulateSleep(Tests::TServer::TPtr server, TDuration duration); +void SimulateSleep(TTestActorRuntime& runtime, TDuration duration); void SendSQL(Tests::TServer::TPtr server, TActorId sender, @@ -635,4 +636,21 @@ struct IsTxResultComplete { void WaitTabletBecomesOffline(Tests::TServer::TPtr server, ui64 tabletId); +/// +class TDisableDataShardLogBatching : public TNonCopyable { +public: + TDisableDataShardLogBatching() + : PrevValue(NDataShard::gAllowLogBatchingDefaultValue) + { + NDataShard::gAllowLogBatchingDefaultValue = false; + } + + ~TDisableDataShardLogBatching() { + NDataShard::gAllowLogBatchingDefaultValue = PrevValue; + } + +private: + const bool PrevValue; +}; + } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index b8a7873652..e5c976dd35 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -124,6 +124,14 @@ namespace NKqpHelpers { return ev->Get()->Record.GetResponse().GetSessionId(); } + inline void CloseSession(TTestActorRuntime& runtime, TActorId sender, const TString& sessionId) { + auto request = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + request->Record.MutableRequest()->SetSessionId(sessionId); + runtime.Send( + new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()), + 0, /* via actor system */ true); + } + inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest( const TActorId sender, const TString& sql, diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index f2b8c97cae..edde89e3c5 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -4595,6 +4595,541 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterStuckRW, UseNewEngine) { } } +Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(UnprotectedReads ? 1 : 0); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + auto table2shards = GetTableShards(server, sender, "/Root/table-2"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + // Perform an immediate write + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + // Perform an immediate read, we should observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Send an immediate write transaction, but don't wait for result + auto senderImmediateWrite = runtime.AllocateEdgeActor(); + SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5) + )"))); + + // We sleep for very little so datashard commits changes, but doesn't advance + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Perform an immediate read again, it should NOT observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Wait for the write to finish + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // Perform an immediate read again, it should observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Start a new write and sleep again + SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7) + )"))); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Verify this write is not observed yet + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Reboot the tablet + RebootTablet(runtime, table1shards.at(0), sender); + + // Verify the write above cannot be observed after restart as well + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Send one more write and sleep again + auto senderImmediateWrite2 = runtime.AllocateEdgeActor(); + SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9) + )"))); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Verify it is also hidden at the moment + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Wait for result of the second write + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // We should finally observe both writes + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + TString snapshotSessionId, snapshotTxId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Reboot the tablet + RebootTablet(runtime, table1shards.at(0), sender); + + // Upsert new data after reboot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (11, 11)")); + + // Make sure datashard state is restored correctly and snapshot is not corrupted + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + // Make sure new snapshot will actually observe new data + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "List { Struct { Optional { Uint32: 11 } } Struct { Optional { Uint32: 11 } } } " + "} Struct { Bool: false }"); +} + +Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain); + + struct TNodeState { + // mediator -> bucket -> [observed, passed] step + THashMap<ui64, THashMap<ui32, std::pair<ui64, ui64>>> Steps; + ui64 AllowedStep = 0; + }; + THashMap<ui32, TNodeState> mediatorState; + + bool mustWaitForSteps[2] = { false, false }; + + auto captureTimecast = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + const ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + const ui32 nodeIndex = nodeId - runtime.GetNodeId(0); + switch (ev->GetTypeRewrite()) { + case TEvMediatorTimecast::TEvUpdate::EventType: { + auto* msg = ev->Get<TEvMediatorTimecast::TEvUpdate>(); + const ui64 mediatorId = msg->Record.GetMediator(); + const ui32 bucket = msg->Record.GetBucket(); + ui64 step = msg->Record.GetTimeBarrier(); + auto& state = mediatorState[nodeId]; + if (!mustWaitForSteps[nodeIndex]) { + // Automatically allow all new steps + state.AllowedStep = Max(state.AllowedStep, step); + } + Cerr << "... node " << nodeId << " observed update from " << mediatorId + << " for bucket " << bucket + << " to step " << step + << " (allowed " << state.AllowedStep << ")" + << Endl; + auto& [observedStep, passedStep] = state.Steps[mediatorId][bucket]; + observedStep = Max(observedStep, step); + if (step >= passedStep) { + if (step < state.AllowedStep) { + step = state.AllowedStep; + msg->Record.SetTimeBarrier(step); + Cerr << "... shifted to allowed step " << step << Endl; + } + passedStep = step; + break; + } + return TTestActorRuntime::EEventAction::DROP; + } + case TEvMediatorTimecast::TEvWaitPlanStep::EventType: { + const auto* msg = ev->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); + const ui64 tabletId = msg->TabletId; + const ui64 step = msg->PlanStep; + Cerr << "... node " << nodeId << " observed wait by " << tabletId + << " for step " << step + << Endl; + auto& state = mediatorState[nodeId]; + if (state.AllowedStep < step) { + state.AllowedStep = step; + for (auto& kv1 : state.Steps) { + const ui64 mediatorId = kv1.first; + for (auto& kv2 : kv1.second) { + const ui32 bucket = kv2.first; + auto& [observedStep, passedStep] = kv2.second; + if (passedStep < step && passedStep < observedStep) { + passedStep = Min(step, observedStep); + auto* update = new TEvMediatorTimecast::TEvUpdate(); + update->Record.SetMediator(mediatorId); + update->Record.SetBucket(bucket); + update->Record.SetTimeBarrier(passedStep); + runtime.Send(new IEventHandle(ev->GetRecipientRewrite(), ev->GetRecipientRewrite(), update), nodeIndex, /* viaActorSystem */ true); + } + } + } + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureTimecast); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + + // Make sure tablet is at node 1 + runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0))); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender); + UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK); + } + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + // Perform an immediate read, we should observe the initial write + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Stop updating mediator timecast on the second node + mustWaitForSteps[1] = true; + + // Insert a new row and wait for result + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); + + // Make sure tablet is at node 2 + runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(1))); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender); + UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK); + } + + // Perform an immediate read, we should observe confirmed writes after restart + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + + // Previous snapshot must see original data + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // However new snapshots must see updated data + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); +} + } // Y_UNIT_TEST_SUITE(DataShardOutOfOrder) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index b32f635cc0..105062ee06 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -23,7 +23,6 @@ public: private: void ExecuteDataTx(TOperation::TPtr op, - TTransactionContext& txc, const TActorContext& ctx); void AddLocksToResult(TOperation::TPtr op); }; @@ -124,7 +123,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, try { try { - ExecuteDataTx(op, txc, ctx); + ExecuteDataTx(op, ctx); } catch (const TNotReadyTabletException&) { // We want to try pinning (actually precharging) all required pages // before restarting the transaction, to minimize future restarts. @@ -169,7 +168,6 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, - TTransactionContext& txc, const TActorContext& ctx) { TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); IEngineFlat* engine = tx->GetDataTx()->GetEngine(); @@ -237,9 +235,6 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, } else { result->SetTxResult(engine->GetShardReply(DataShard.TabletID())); - if (op->IsImmediate() && !op->IsReadOnly()) - DataShard.PromoteCompleteEdge(writeVersion.Step, txc); - if (auto changes = tx->GetDataTx()->GetCollectedChanges()) { op->ChangeRecords().reserve(changes.size()); for (const auto& change : changes) { diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index ed5d912a41..1fd5f15f01 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -183,10 +183,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio AddLocksToResult(op, ctx); - if (op->IsImmediate() && !op->IsReadOnly()) { - DataShard.PromoteCompleteEdge(writeVersion.Step, txc); - } - if (auto changes = dataTx->GetCollectedChanges()) { op->ChangeRecords().reserve(changes.size()); for (const auto& change : changes) { diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index de0e9e4ad2..d468922853 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -20,6 +20,7 @@ public: const TActorContext &ctx) override; private: + TDataShard::TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(TOperation* op, TTransactionContext& txc); void CompleteRequest(TOperation::TPtr op, const TActorContext &ctx); void AddDiagnosticsResult(TOutputOpData::TResultPtr &res); @@ -43,6 +44,27 @@ bool TFinishProposeUnit::IsReadyToExecute(TOperation::TPtr) const return true; } +TDataShard::TPromotePostExecuteEdges TFinishProposeUnit::PromoteImmediatePostExecuteEdges( + TOperation* op, + TTransactionContext& txc) +{ + if (op->IsMvccSnapshotRead()) { + if (op->IsMvccSnapshotRepeatable()) { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::RepeatableRead, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } + } else if (op->MvccReadWriteVersion) { + if (op->IsReadOnly()) { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + } + } else { + return { }; + } +} + EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) @@ -53,24 +75,18 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, bool hadWrites = false; // When mvcc is enabled we perform marking after transaction is executed - if (DataShard.IsMvccEnabled() && op->IsImmediate()) { - if (op->IsMvccSnapshotRead()) { - hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(op->GetMvccSnapshot(), txc); - if (op->IsMvccSnapshotRepeatable()) { - hadWrites |= DataShard.PromoteCompleteEdge(op.Get(), txc); - if (!hadWrites && DataShard.GetSnapshotManager().GetCommittedCompleteEdge() < op->GetMvccSnapshot()) { - // We need to wait for completion because some other transaction - // has moved complete edge, but it's not committed yet. - op->SetWaitCompletionFlag(true); - } - } - } else if (op->MvccReadWriteVersion) { - hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(*op->MvccReadWriteVersion, txc); + if (op->IsAborted()) { + // Make sure we confirm aborts with a commit + op->SetWaitCompletionFlag(true); + } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) { + auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc); + + if (res.HadWrites) { + hadWrites = true; + res.WaitCompletion = true; } - if (hadWrites) { - // FIXME: even if transaction itself didn't promote, we may still need to - // wait for completion, when current in-memory state is not actually committed + if (res.WaitCompletion) { op->SetWaitCompletionFlag(true); } } @@ -159,8 +175,16 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size()); - if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) - ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie()); + if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { + if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) { + DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie()); + } else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) { + // TODO: we should actually measure a read timestamp and use it here + DataShard.SendImmediateReadResult(op->GetTarget(), res.Release(), op->GetCookie()); + } else { + ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie()); + } + } } void TFinishProposeUnit::AddDiagnosticsResult(TOutputOpData::TResultPtr &res) diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp index 851ff5424b..7dc3a46420 100644 --- a/ydb/core/tx/time_cast/time_cast.cpp +++ b/ydb/core/tx/time_cast/time_cast.cpp @@ -15,6 +15,9 @@ namespace NKikimr { +// We will unsubscribe from idle coordinators after 5 minutes +static constexpr TDuration MaxIdleCoordinatorSubscriptionTime = TDuration::Minutes(5); + ui64 TMediatorTimecastEntry::Get(ui64 tabletId) const { Y_UNUSED(tabletId); return AtomicGet(Step); @@ -72,9 +75,29 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { ui32 RefCount = 0; }; + struct TCoordinatorSubscriber { + TSet<ui64> Coordinators; + }; + + struct TCoordinator { + TMediatorTimecastReadStep::TPtr ReadStep = new TMediatorTimecastReadStep; + TActorId PipeClient; + ui64 LastSentSeqNo = 0; + ui64 LastConfirmedSeqNo = 0; + ui64 LastObservedReadStep = 0; + THashSet<TActorId> Subscribers; + TMap<std::pair<ui64, TActorId>, ui64> SubscribeRequests; // (seqno, subscriber) -> Cookie + TMap<std::pair<ui64, TActorId>, ui64> WaitRequests; // (step, subscriber) -> Cookie + TMonotonic IdleStart; + }; + THashMap<ui64, TMediator> Mediators; // mediator tablet -> info THashMap<ui64, TTabletInfo> Tablets; + ui64 LastSeqNo = 0; + THashMap<ui64, TCoordinator> Coordinators; + THashMap<TActorId, TCoordinatorSubscriber> CoordinatorSubscribers; + TMediator& MediatorInfo(ui64 mediator, const NKikimrSubDomains::TProcessingParams &processing) { auto pr = Mediators.try_emplace(mediator, processing.GetTimeCastBucketsPerMediator()); if (!pr.second) { @@ -118,19 +141,24 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { } void TryResync(const TActorId &pipeClient, ui64 tabletId, const TActorContext &ctx) { - for (auto &xpair : Mediators) { - const ui64 mediatorTabletId = xpair.first; - TMediator &mediator = xpair.second; - - if (mediator.PipeClient == pipeClient) { - Y_VERIFY(tabletId == mediatorTabletId); - mediator.PipeClient = TActorId(); - RegisterMediator(mediatorTabletId, mediator, ctx); - return; - } + ResyncCoordinator(tabletId, pipeClient, ctx); + + auto it = Mediators.find(tabletId); + if (it == Mediators.end()) { + return; + } + + TMediator &mediator = it->second; + if (mediator.PipeClient == pipeClient) { + mediator.PipeClient = TActorId(); + RegisterMediator(tabletId, mediator, ctx); } } + void SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx); + void ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx); + void NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvRegisterTablet::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvUnregisterTablet::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &ev, const TActorContext &ctx); @@ -138,6 +166,15 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx); + // Client requests for readstep subscriptions + void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx); + + // Coordinator replies for readstep subscriptions + void Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx); + void Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_MEDIATOR_TIMECAST_ACTOR; @@ -154,6 +191,13 @@ public: HFunc(TEvMediatorTimecast::TEvWaitPlanStep, Handle); HFunc(TEvMediatorTimecast::TEvUpdate, Handle); + HFunc(TEvMediatorTimecast::TEvSubscribeReadStep, Handle); + HFunc(TEvMediatorTimecast::TEvUnsubscribeReadStep, Handle); + HFunc(TEvMediatorTimecast::TEvWaitReadStep, Handle); + + HFunc(TEvTxProxy::TEvSubscribeReadStepResult, Handle); + HFunc(TEvTxProxy::TEvSubscribeReadStepUpdate, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); } @@ -280,13 +324,13 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co default: { const ui64 step = record.GetTimeBarrier(); bucket.Entry->Update(step, nullptr, 0); - THashSet<ui64> processed; // a set of processed tablets + THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets while (!bucket.Waiters.empty()) { const auto& top = bucket.Waiters.top(); if (step < top.PlanStep) { break; } - if (processed.insert(top.TabletId).second) { + if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) { ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step)); } bucket.Waiters.pop(); @@ -297,6 +341,224 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co } } +void TMediatorTimecastProxy::SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) { + const ui64 seqNo = LastSeqNo; + + if (!coordinator.PipeClient) { + ui64 maxDelay = 100 + TAppData::RandomProvider->GenRand64() % 50; + auto retryPolicy = NTabletPipe::TClientRetryPolicy{ + .RetryLimitCount = 6 /* delays: 0, 10, 20, 40, 80, 100-150 */, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(maxDelay), + }; + coordinator.PipeClient = ctx.RegisterWithSameMailbox( + NTabletPipe::CreateClient(ctx.SelfID, coordinatorId, retryPolicy)); + } + + coordinator.LastSentSeqNo = seqNo; + NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvSubscribeReadStep(coordinatorId, seqNo)); +} + +void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx) { + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + + auto &coordinator = itCoordinator->second; + if (coordinator.PipeClient != pipeClient) { + return; + } + + coordinator.PipeClient = TActorId(); + if (coordinator.Subscribers.empty()) { + // Just forget disconnected idle coordinators + Coordinators.erase(itCoordinator); + return; + } + + ++LastSeqNo; + SyncCoordinator(coordinatorId, coordinator, ctx); +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + const ui64 coordinatorId = msg->CoordinatorId; + auto &subscriber = CoordinatorSubscribers[ev->Sender]; + auto &coordinator = Coordinators[coordinatorId]; + subscriber.Coordinators.insert(coordinatorId); + coordinator.Subscribers.insert(ev->Sender); + ui64 seqNo = ++LastSeqNo; + auto key = std::make_pair(seqNo, ev->Sender); + coordinator.SubscribeRequests[key] = ev->Cookie; + SyncCoordinator(coordinatorId, coordinator, ctx); +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + auto &subscriber = CoordinatorSubscribers[ev->Sender]; + if (msg->CoordinatorId == 0) { + // Unsubscribe from all coordinators + for (ui64 coordinatorId : subscriber.Coordinators) { + auto &coordinator = Coordinators[coordinatorId]; + coordinator.Subscribers.erase(ev->Sender); + if (coordinator.Subscribers.empty()) { + coordinator.IdleStart = ctx.Monotonic(); + } + } + subscriber.Coordinators.clear(); + } else if (subscriber.Coordinators.contains(msg->CoordinatorId)) { + // Unsubscribe from specific coordinator + auto &coordinator = Coordinators[msg->CoordinatorId]; + coordinator.Subscribers.erase(ev->Sender); + if (coordinator.Subscribers.empty()) { + coordinator.IdleStart = ctx.Monotonic(); + } + subscriber.Coordinators.erase(msg->CoordinatorId); + } + + if (subscriber.Coordinators.empty()) { + // Don't track unnecessary subscribers + CoordinatorSubscribers.erase(ev->Sender); + } +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + const ui64 coordinatorId = msg->CoordinatorId; + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + if (!coordinator.Subscribers.contains(ev->Sender)) { + return; + } + + const ui64 step = coordinator.LastObservedReadStep; + const ui64 waitReadStep = msg->ReadStep; + if (waitReadStep <= step) { + ctx.Send(ev->Sender, + new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step), + 0, ev->Cookie); + return; + } + + auto key = std::make_pair(waitReadStep, ev->Sender); + coordinator.WaitRequests[key] = ev->Cookie; +} + +void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx) { + const auto &record = ev->Get()->Record; + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE TEvSubscribeReadStepResult " << record.ShortDebugString()); + + const ui64 coordinatorId = record.GetCoordinatorID(); + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + bool updated = false; + const ui64 nextReadStep = record.GetNextAcquireStep(); + if (coordinator.LastObservedReadStep < nextReadStep) { + coordinator.LastObservedReadStep = nextReadStep; + coordinator.ReadStep->Update(nextReadStep); + updated = true; + } + + const ui64 seqNo = record.GetSeqNo(); + if (coordinator.LastConfirmedSeqNo < seqNo) { + coordinator.LastConfirmedSeqNo = seqNo; + + const ui64 lastReadStep = record.GetLastAcquireStep(); + for (auto it = coordinator.SubscribeRequests.begin(); it != coordinator.SubscribeRequests.end();) { + const ui64 waitSeqNo = it->first.first; + if (seqNo < waitSeqNo) { + break; + } + const TActorId subscriberId = it->first.second; + const ui64 cookie = it->second; + if (coordinator.Subscribers.contains(subscriberId)) { + ctx.Send(subscriberId, + new TEvMediatorTimecast::TEvSubscribeReadStepResult( + coordinatorId, + lastReadStep, + nextReadStep, + coordinator.ReadStep), + 0, cookie); + } + it = coordinator.SubscribeRequests.erase(it); + } + } + + if (updated) { + NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx); + } +} + +void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx) { + const auto &record = ev->Get()->Record; + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE TEvSubscribeReadStepUpdate " << record.ShortDebugString()); + + const ui64 coordinatorId = record.GetCoordinatorID(); + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + const ui64 nextReadStep = record.GetNextAcquireStep(); + if (coordinator.LastObservedReadStep < nextReadStep) { + coordinator.LastObservedReadStep = nextReadStep; + coordinator.ReadStep->Update(nextReadStep); + + NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx); + } + + // Unsubscribe from idle coordinators + if (coordinator.Subscribers.empty() && (ctx.Monotonic() - coordinator.IdleStart) >= MaxIdleCoordinatorSubscriptionTime) { + if (coordinator.PipeClient) { + NTabletPipe::CloseClient(ctx, coordinator.PipeClient); + coordinator.PipeClient = TActorId(); + } + + Coordinators.erase(itCoordinator); + } +} + +void TMediatorTimecastProxy::NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) { + const ui64 step = coordinator.LastObservedReadStep; + for (auto it = coordinator.WaitRequests.begin(); it != coordinator.WaitRequests.end();) { + const ui64 waitStep = it->first.first; + if (step < waitStep) { + break; + } + const TActorId subscriberId = it->first.second; + const ui64 cookie = it->second; + if (coordinator.Subscribers.contains(subscriberId)) { + ctx.Send(subscriberId, + new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step), + 0, cookie); + } + it = coordinator.WaitRequests.erase(it); + } +} + + + IActor* CreateMediatorTimecastProxy() { return new TMediatorTimecastProxy(); } diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h index 1202d5a799..0765eae9c4 100644 --- a/ydb/core/tx/time_cast/time_cast.h +++ b/ydb/core/tx/time_cast/time_cast.h @@ -21,15 +21,41 @@ public: void Update(ui64 step, ui64 *exemption, ui64 exsz); }; +class TMediatorTimecastReadStep : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TMediatorTimecastReadStep>; + using TCPtr = TIntrusiveConstPtr<TMediatorTimecastReadStep>; + + TMediatorTimecastReadStep(ui64 nextReadStep = 0) + : NextReadStep{ nextReadStep } + { } + + ui64 Get() const { + return NextReadStep.load(); + } + + void Update(ui64 nextReadStep) { + NextReadStep.store(nextReadStep); + } + +private: + std::atomic<ui64> NextReadStep; +}; + struct TEvMediatorTimecast { enum EEv { // local part EvRegisterTablet = EventSpaceBegin(TKikimrEvents::ES_TX_MEDIATORTIMECAST), EvUnregisterTablet, EvWaitPlanStep, + EvSubscribeReadStep, + EvUnsubscribeReadStep, + EvWaitReadStep, EvRegisterTabletResult = EvRegisterTablet + 1 * 512, EvNotifyPlanStep, + EvSubscribeReadStepResult, + EvNotifyReadStep, // mediator part EvWatch = EvRegisterTablet + 2 * 512, @@ -138,6 +164,104 @@ struct TEvMediatorTimecast { } }; + struct TEvSubscribeReadStep : public TEventLocal<TEvSubscribeReadStep, EvSubscribeReadStep> { + const ui64 CoordinatorId; + + explicit TEvSubscribeReadStep(ui64 coordinatorId) + : CoordinatorId(coordinatorId) + { + Y_VERIFY(coordinatorId != 0); + } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " }"; + } + }; + + struct TEvUnsubscribeReadStep : public TEventLocal<TEvUnsubscribeReadStep, EvUnsubscribeReadStep> { + const ui64 CoordinatorId; + + explicit TEvUnsubscribeReadStep(ui64 coordinatorId = 0) + : CoordinatorId(coordinatorId) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " }"; + } + }; + + struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> { + const ui64 CoordinatorId; + const ui64 LastReadStep; + const ui64 NextReadStep; + const TMediatorTimecastReadStep::TCPtr ReadStep; + + TEvSubscribeReadStepResult( + ui64 coordinatorId, + ui64 lastReadStep, + ui64 nextReadStep, + TMediatorTimecastReadStep::TCPtr readStep) + : CoordinatorId(coordinatorId) + , LastReadStep(lastReadStep) + , NextReadStep(nextReadStep) + , ReadStep(std::move(readStep)) + { + Y_VERIFY(ReadStep); + } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " LastReadStep# " << LastReadStep + << " NextReadStep# " << NextReadStep + << " ReadStep# " << ReadStep->Get() + << " }"; + } + }; + + struct TEvWaitReadStep : public TEventLocal<TEvWaitReadStep, EvWaitReadStep> { + const ui64 CoordinatorId; + const ui64 ReadStep; + + TEvWaitReadStep(ui64 coordinatorId, ui64 readStep) + : CoordinatorId(coordinatorId) + , ReadStep(readStep) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " ReadStep# " << ReadStep + << " }"; + } + }; + + struct TEvNotifyReadStep : public TEventLocal<TEvNotifyReadStep, EvNotifyReadStep> { + const ui64 CoordinatorId; + const ui64 ReadStep; + + TEvNotifyReadStep(ui64 coordinatorId, ui64 readStep) + : CoordinatorId(coordinatorId) + , ReadStep(readStep) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " ReadStep# " << ReadStep + << " }"; + } + }; + struct TEvWatch : public TEventPB<TEvWatch, NKikimrTxMediatorTimecast::TEvWatch, EvWatch> { TEvWatch() {} diff --git a/ydb/core/tx/time_cast/time_cast_ut.cpp b/ydb/core/tx/time_cast/time_cast_ut.cpp new file mode 100644 index 0000000000..0acb2ee128 --- /dev/null +++ b/ydb/core/tx/time_cast/time_cast_ut.cpp @@ -0,0 +1,84 @@ +#include "time_cast.h" +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/testlib/test_client.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NMediatorTimeCastTest { + + using namespace Tests; + + Y_UNIT_TEST_SUITE(MediatorTimeCast) { + + void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) { + auto sender = runtime.AllocateEdgeActor(); + runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration); + runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); + } + + void SendSubscribeRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 cookie = 0) { + auto request = MakeHolder<TEvMediatorTimecast::TEvSubscribeReadStep>(coordinatorId); + runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true); + } + + TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr WaitSubscribeResult(TTestActorRuntime& runtime, const TActorId& sender) { + return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvSubscribeReadStepResult>(sender); + } + + void SendWaitRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 readStep, ui64 cookie = 0) { + auto request = MakeHolder<TEvMediatorTimecast::TEvWaitReadStep>(coordinatorId, readStep); + runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true); + } + + TEvMediatorTimecast::TEvNotifyReadStep::TPtr WaitNotifyResult(TTestActorRuntime& runtime, const TActorId& sender) { + return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvNotifyReadStep>(sender); + } + + Y_UNIT_TEST(ReadStepSubscribe) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + + auto &runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NActors::NLog::PRI_DEBUG); + + auto sender = runtime.AllocateEdgeActor(); + ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + + SendSubscribeRequest(runtime, sender, coordinatorId); + auto result = WaitSubscribeResult(runtime, sender); + auto stepPtr = result->Get()->ReadStep; + + ui64 readStep1 = stepPtr->Get(); + UNIT_ASSERT_GE(readStep1, result->Get()->LastReadStep); + + SimulateSleep(runtime, TDuration::Seconds(5)); + + ui64 readStep2 = stepPtr->Get(); + UNIT_ASSERT_GT(readStep2, readStep1); + + ui64 waitStep = readStep2 + 2000; + SendWaitRequest(runtime, sender, coordinatorId, waitStep); + + { + auto notify = WaitNotifyResult(runtime, sender); + UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep); + UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get()); + } + + ui64 waitStep2 = stepPtr->Get() + 5000; + RebootTablet(runtime, coordinatorId, sender); + SendWaitRequest(runtime, sender, coordinatorId, waitStep2); + + { + auto notify = WaitNotifyResult(runtime, sender); + UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep2); + UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get()); + } + } + + } // Y_UNIT_TEST_SUITE(MediatorTimeCast) + +} // namespace NKikimr::NMediatorTimeCastTest diff --git a/ydb/core/tx/time_cast/ut/ya.make b/ydb/core/tx/time_cast/ut/ya.make new file mode 100644 index 0000000000..1f055a3e2e --- /dev/null +++ b/ydb/core/tx/time_cast/ut/ya.make @@ -0,0 +1,26 @@ +UNITTEST_FOR(ydb/core/tx/time_cast) + +OWNER(g:kikimr) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) + REQUIREMENTS(ram:16) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/testlib + ydb/core/tx +) + +YQL_LAST_ABI_VERSION() + +SRCS( + time_cast_ut.cpp +) + +END() diff --git a/ydb/core/tx/time_cast/ya.make b/ydb/core/tx/time_cast/ya.make index 9c059ac2b9..e8c260833e 100644 --- a/ydb/core/tx/time_cast/ya.make +++ b/ydb/core/tx/time_cast/ya.make @@ -18,3 +18,7 @@ PEERDIR( ) END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/util/testactorsys.cpp b/ydb/core/util/testactorsys.cpp index 73d27bc9d9..4a270e4f8c 100644 --- a/ydb/core/util/testactorsys.cpp +++ b/ydb/core/util/testactorsys.cpp @@ -208,4 +208,12 @@ TIntrusivePtr<ITimeProvider> TTestActorSystem::CreateTimeProvider() { return MakeIntrusive<TTestActorTimeProvider>(); } +TIntrusivePtr<IMonotonicTimeProvider> TTestActorSystem::CreateMonotonicTimeProvider() { + class TTestActorMonotonicTimeProvider : public IMonotonicTimeProvider { + public: + TMonotonic Now() override { return TMonotonic::MicroSeconds(CurrentTestActorSystem->Clock.MicroSeconds()); } + }; + return MakeIntrusive<TTestActorMonotonicTimeProvider>(); +} + } diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index ce6ad7be10..722d5a8163 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -187,6 +187,8 @@ public: Y_VERIFY(!CurrentTestActorSystem); CurrentTestActorSystem = this; + + AppData.MonotonicTimeProvider = CreateMonotonicTimeProvider(); } ~TTestActorSystem() { @@ -195,6 +197,7 @@ public: } static TIntrusivePtr<ITimeProvider> CreateTimeProvider(); + static TIntrusivePtr<IMonotonicTimeProvider> CreateMonotonicTimeProvider(); TAppData *GetAppData() { return &AppData; |