aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-04-18 18:40:28 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-18 18:40:28 +0300
commit9ada39f7f8bf02130d705ad98476449e5ff29443 (patch)
tree5f685fdb250fe15b40eec5b95a3817f2957d3f12
parent86304e375567b039455008cdc9b12cfc5f5c66de (diff)
downloadydb-9ada39f7f8bf02130d705ad98476449e5ff29443.tar.gz
22-2: Snapshot isolation with prioritized reads, KIKIMR-13910
Merge from trunk: r9171244, r9229795, r9244174, r9285534, r9300998, r9313872 REVIEW: 2435130 x-ydb-stable-ref: 9c878f6e7949737ee83323c98c1b7c64c372710e
-rw-r--r--library/cpp/actors/core/monotonic_provider.cpp16
-rw-r--r--library/cpp/actors/core/monotonic_provider.h14
-rw-r--r--library/cpp/actors/core/ya.make2
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp26
-rw-r--r--library/cpp/actors/testlib/test_runtime.h5
-rw-r--r--ydb/core/base/appdata.cpp1
-rw-r--r--ydb/core/base/appdata.h2
-rw-r--r--ydb/core/base/tablet.h91
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator.cpp57
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator.h3
-rw-r--r--ydb/core/cms/console/immediate_controls_configurator_ut.cpp3
-rw-r--r--ydb/core/cms/json_proxy_proto.h2
-rw-r--r--ydb/core/control/immediate_control_board_control.cpp7
-rw-r--r--ydb/core/control/immediate_control_board_control.h1
-rw-r--r--ydb/core/control/immediate_control_board_impl.cpp10
-rw-r--r--ydb/core/control/immediate_control_board_impl.h2
-rw-r--r--ydb/core/control/immediate_control_board_wrapper.h9
-rw-r--r--ydb/core/protos/config.proto36
-rw-r--r--ydb/core/protos/counters_coordinator.proto2
-rw-r--r--ydb/core/protos/tablet.proto14
-rw-r--r--ydb/core/tablet/tablet_req_rebuildhistory.cpp17
-rw-r--r--ydb/core/tablet/tablet_sys.cpp10
-rw-r--r--ydb/core/tablet_flat/flat_boot_cookie.h4
-rw-r--r--ydb/core/tablet_flat/flat_boot_lease.cpp110
-rw-r--r--ydb/core/tablet_flat/flat_exec_commit.h2
-rw-r--r--ydb/core/tablet_flat/flat_exec_commit_mgr.h1
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp180
-rw-r--r--ydb/core/tablet_flat/flat_executor.h40
-rw-r--r--ydb/core/tablet_flat/flat_executor.proto11
-rw-r--r--ydb/core/tablet_flat/flat_executor_bootlogic.cpp25
-rw-r--r--ydb/core/tablet_flat/flat_executor_bootlogic.h4
-rw-r--r--ydb/core/tablet_flat/flat_executor_leases_ut.cpp365
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.cpp12
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h8
-rw-r--r--ydb/core/tablet_flat/ut/ya.make2
-rw-r--r--ydb/core/tablet_flat/ya.make1
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp3
-rw-r--r--ydb/core/testlib/test_client.cpp6
-rw-r--r--ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp16
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.cpp23
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.h14
-rw-r--r--ydb/core/tx/datashard/datashard.cpp422
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp15
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.h1
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.h2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.cpp15
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.h8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.h2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h87
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp63
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h4
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp97
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.h18
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h3
-rw-r--r--ydb/core/tx/datashard/datashard_unsafe_upload.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h18
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp535
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp60
-rw-r--r--ydb/core/tx/time_cast/time_cast.cpp286
-rw-r--r--ydb/core/tx/time_cast/time_cast.h124
-rw-r--r--ydb/core/tx/time_cast/time_cast_ut.cpp84
-rw-r--r--ydb/core/tx/time_cast/ut/ya.make26
-rw-r--r--ydb/core/tx/time_cast/ya.make4
-rw-r--r--ydb/core/util/testactorsys.cpp8
-rw-r--r--ydb/core/util/testactorsys.h3
72 files changed, 2941 insertions, 166 deletions
diff --git a/library/cpp/actors/core/monotonic_provider.cpp b/library/cpp/actors/core/monotonic_provider.cpp
new file mode 100644
index 0000000000..fb3b656da6
--- /dev/null
+++ b/library/cpp/actors/core/monotonic_provider.cpp
@@ -0,0 +1,16 @@
+#include "monotonic_provider.h"
+
+namespace NActors {
+
+class TDefaultMonotonicTimeProvider : public IMonotonicTimeProvider {
+public:
+ TMonotonic Now() override {
+ return TMonotonic::Now();
+ }
+};
+
+TIntrusivePtr<IMonotonicTimeProvider> CreateDefaultMonotonicTimeProvider() {
+ return TIntrusivePtr<IMonotonicTimeProvider>(new TDefaultMonotonicTimeProvider);
+}
+
+} // namespace NActors
diff --git a/library/cpp/actors/core/monotonic_provider.h b/library/cpp/actors/core/monotonic_provider.h
new file mode 100644
index 0000000000..98e1203400
--- /dev/null
+++ b/library/cpp/actors/core/monotonic_provider.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "monotonic.h"
+
+namespace NActors {
+
+class IMonotonicTimeProvider : public TThrRefBase {
+public:
+ virtual TMonotonic Now() = 0;
+};
+
+TIntrusivePtr<IMonotonicTimeProvider> CreateDefaultMonotonicTimeProvider();
+
+} // namespace NActors
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 880a9d00db..2979020d68 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -83,6 +83,8 @@ SRCS(
mon_stats.h
monotonic.cpp
monotonic.h
+ monotonic_provider.cpp
+ monotonic_provider.h
worker_context.cpp
worker_context.h
probes.cpp
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 6fa25b9965..51d93ba6e9 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -233,6 +233,20 @@ namespace NActors {
TTestActorRuntimeBase& Runtime;
};
+ class TTestActorRuntimeBase::TMonotonicTimeProvider : public IMonotonicTimeProvider {
+ public:
+ TMonotonicTimeProvider(TTestActorRuntimeBase& runtime)
+ : Runtime(runtime)
+ { }
+
+ TMonotonic Now() override {
+ return Runtime.GetCurrentMonotonicTime();
+ }
+
+ private:
+ TTestActorRuntimeBase& Runtime;
+ };
+
class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
public:
TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
@@ -470,6 +484,7 @@ namespace NActors {
, NeedMonitoring(false)
, RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
, TimeProvider(new TTimeProvider(*this))
+ , MonotonicTimeProvider(new TMonotonicTimeProvider(*this))
, ShouldContinue()
, CurrentTimestamp(0)
, DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
@@ -797,6 +812,12 @@ namespace NActors {
return TInstant::MicroSeconds(CurrentTimestamp);
}
+ TMonotonic TTestActorRuntimeBase::GetCurrentMonotonicTime() const {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(!UseRealThreads);
+ return TMonotonic::MicroSeconds(CurrentTimestamp);
+ }
+
void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
static int counter = 0;
++counter;
@@ -823,6 +844,11 @@ namespace NActors {
return TimeProvider;
}
+ TIntrusivePtr<IMonotonicTimeProvider> TTestActorRuntimeBase::GetMonotonicTimeProvider() {
+ Y_VERIFY(!UseRealThreads);
+ return MonotonicTimeProvider;
+ }
+
ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
Y_VERIFY(index < NodeCount);
return FirstNodeId + index;
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 26e3b45c98..c14cde1b66 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/executor_thread.h>
#include <library/cpp/actors/core/mailbox.h>
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/actors/util/should_continue.h>
#include <library/cpp/actors/interconnect/poller_tcp.h>
#include <library/cpp/actors/interconnect/mock/ic_mock.h>
@@ -188,6 +189,7 @@ namespace NActors {
class TSchedulerThreadStub;
class TExecutorPoolStub;
class TTimeProvider;
+ class TMonotonicTimeProvider;
enum class EEventAction {
PROCESS,
@@ -229,7 +231,9 @@ namespace NActors {
void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
TIntrusivePtr<ITimeProvider> GetTimeProvider();
+ TIntrusivePtr<IMonotonicTimeProvider> GetMonotonicTimeProvider();
TInstant GetCurrentTime() const;
+ TMonotonic GetCurrentMonotonicTime() const;
void UpdateCurrentTime(TInstant newTime);
void AdvanceCurrentTime(TDuration duration);
void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
@@ -534,6 +538,7 @@ namespace NActors {
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
+ TIntrusivePtr<IMonotonicTimeProvider> MonotonicTimeProvider;
protected:
struct TNodeDataBase: public TThrRefBase {
diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp
index f9e517fc42..b2b77e9e88 100644
--- a/ydb/core/base/appdata.cpp
+++ b/ydb/core/base/appdata.cpp
@@ -19,6 +19,7 @@ TAppData::TAppData(
, TypeRegistry(typeRegistry)
, FunctionRegistry(functionRegistry)
, FormatFactory(formatFactory)
+ , MonotonicTimeProvider(CreateDefaultMonotonicTimeProvider())
, ProxySchemeCacheNodes(Max<ui64>() / 4)
, ProxySchemeCacheDistrNodes(Max<ui64>() / 4)
, CompilerSchemeCachePaths(Max<ui64>() / 4)
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index 0a91d2560e..d6c0075e62 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -21,6 +21,7 @@
#include <library/cpp/actors/interconnect/poller_tcp.h>
#include <library/cpp/actors/core/executor_thread.h>
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/actors/util/should_continue.h>
#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/time_provider/time_provider.h>
@@ -116,6 +117,7 @@ struct TAppData {
static TIntrusivePtr<IRandomProvider> RandomProvider;
static TIntrusivePtr<ITimeProvider> TimeProvider;
+ TIntrusivePtr<IMonotonicTimeProvider> MonotonicTimeProvider;
TIntrusivePtr<TDomainsInfo> DomainsInfo;
TIntrusivePtr<TChannelProfiles> ChannelProfiles;
TIntrusivePtr<TDynamicNameserviceConfig> DynamicNameserviceConfig;
diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h
index 602e39c600..3b97f4fe09 100644
--- a/ydb/core/base/tablet.h
+++ b/ydb/core/base/tablet.h
@@ -52,6 +52,7 @@ struct TEvTablet {
EvFollowerSyncComplete, // from leader to user tablet when all old followers are touched and synced
EvCutTabletHistory,
EvUpdateConfig,
+ EvDropLease,
EvCommit = EvBoot + 512,
EvAux,
@@ -60,6 +61,7 @@ struct TEvTablet {
EvTabletActive,
EvPromoteToLeader,
EvFGcAck, // from user tablet to follower
+ EvLeaseDropped,
EvTabletDead = EvBoot + 1024,
EvFollowerUpdateState, // notifications to guardian
@@ -101,6 +103,16 @@ struct TEvTablet {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET)");
+ struct TCommitMetadata {
+ ui32 Key;
+ TString Data;
+
+ TCommitMetadata(ui32 key, TString data)
+ : Key(key)
+ , Data(std::move(data))
+ { }
+ };
+
struct TDependencyGraph : public TThrRefBase {
struct TEntry {
std::pair<ui32, ui32> Id;
@@ -111,28 +123,29 @@ struct TEvTablet {
TVector<TLogoBlobID> GcLeft;
TString EmbeddedLogBody;
-
- TEntry()
- : IsSnapshot(false)
- {}
-
- void Set(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &refs, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Id = id;
- References.swap(refs);
- IsSnapshot = isSnapshot;
- GcDiscovered.swap(gcDiscovered);
- GcLeft.swap(gcLeft);
- EmbeddedLogBody.clear();
- }
-
- void Set(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Id = id;
- References.clear();
- IsSnapshot = false;
- GcDiscovered.swap(gcDiscovered);
- GcLeft.swap(gcLeft);
- EmbeddedLogBody = embeddedLogBody;
- }
+ TVector<TCommitMetadata> EmbeddedMetadata;
+
+ TEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ : Id(id)
+ , IsSnapshot(isSnapshot)
+ , References(std::move(refs))
+ , GcDiscovered(std::move(gcDiscovered))
+ , GcLeft(std::move(gcLeft))
+ , EmbeddedMetadata(std::move(metadata))
+ { }
+
+ TEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ : Id(id)
+ , IsSnapshot(false)
+ , GcDiscovered(std::move(gcDiscovered))
+ , GcLeft(std::move(gcLeft))
+ , EmbeddedLogBody(std::move(embeddedLogBody))
+ , EmbeddedMetadata(std::move(metadata))
+ { }
};
std::pair<ui32, ui32> Snapshot;
@@ -148,19 +161,23 @@ struct TEvTablet {
<< " entries " << Entries.size() << "}";
}
- void AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &references, bool isSnapshot, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
+ TEntry& AddEntry(const std::pair<ui32, ui32> &id, TVector<TLogoBlobID> &&refs, bool isSnapshot,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ {
if (isSnapshot) {
Snapshot = id;
Entries.clear();
}
- Entries.push_back(TEntry());
- Entries.back().Set(id, references, isSnapshot, gcDiscovered, gcLeft);
+ return Entries.emplace_back(id, std::move(refs), isSnapshot, std::move(gcDiscovered), std::move(gcLeft), std::move(metadata));
}
- void AddEntry(const std::pair<ui32, ui32> &id, const TString &embeddedLogBody, TVector<TLogoBlobID> &gcDiscovered, TVector<TLogoBlobID> &gcLeft) {
- Entries.push_back(TEntry());
- Entries.back().Set(id, embeddedLogBody, gcDiscovered, gcLeft);
+ TEntry& AddEntry(const std::pair<ui32, ui32> &id, TString embeddedLogBody,
+ TVector<TLogoBlobID> &&gcDiscovered, TVector<TLogoBlobID> &&gcLeft,
+ TVector<TCommitMetadata> &&metadata)
+ {
+ return Entries.emplace_back(id, std::move(embeddedLogBody), std::move(gcDiscovered), std::move(gcLeft), std::move(metadata));
}
void Invalidate() {
@@ -269,6 +286,8 @@ struct TEvTablet {
TString EmbeddedLogBody;
TString FollowerAux;
+ TVector<TCommitMetadata> EmbeddedMetadata;
+
TEvCommit(ui64 tabletId, ui32 gen, ui32 step, const TVector<ui32> &dependsOn, bool isSnapshot
, bool preCommited = false
, TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticMinLatency)
@@ -777,6 +796,22 @@ struct TEvTablet {
};
struct TEvTabletStopped : TEventLocal<TEvTabletStopped, EvTabletStopped> {};
+
+ struct TEvDropLease : TEventPB<TEvDropLease, NKikimrTabletBase::TEvDropLease, EvDropLease> {
+ TEvDropLease() = default;
+
+ explicit TEvDropLease(ui64 tabletId) {
+ Record.SetTabletID(tabletId);
+ }
+ };
+
+ struct TEvLeaseDropped : TEventPB<TEvLeaseDropped, NKikimrTabletBase::TEvLeaseDropped, EvLeaseDropped> {
+ TEvLeaseDropped() = default;
+
+ explicit TEvLeaseDropped(ui64 tabletId) {
+ Record.SetTabletID(tabletId);
+ }
+ };
};
IActor* CreateTabletKiller(ui64 tabletId, ui32 nodeId = 0, ui32 maxGeneration = Max<ui32>());
diff --git a/ydb/core/cms/console/immediate_controls_configurator.cpp b/ydb/core/cms/console/immediate_controls_configurator.cpp
index af3ad24217..4f959a1891 100644
--- a/ydb/core/cms/console/immediate_controls_configurator.cpp
+++ b/ydb/core/cms/console/immediate_controls_configurator.cpp
@@ -18,7 +18,8 @@ public:
}
TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg);
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls);
void Bootstrap(const TActorContext &ctx);
@@ -38,13 +39,15 @@ public:
}
private:
- void CreateControls(TIntrusivePtr<TControlBoard> board);
+ void CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting);
void CreateControls(TIntrusivePtr<TControlBoard> board,
const google::protobuf::Descriptor *desc,
- const TString &prefix);
+ const TString &prefix,
+ bool allowExisting);
void AddControl(TIntrusivePtr<TControlBoard> board,
const google::protobuf::FieldDescriptor *desc,
- const TString &prefix);
+ const TString &prefix,
+ bool allowExisting);
void ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg,
TIntrusivePtr<TControlBoard> board);
void ApplyConfig(const ::google::protobuf::Message &cfg,
@@ -57,9 +60,10 @@ private:
};
TImmediateControlsConfigurator::TImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg)
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls)
{
- CreateControls(board);
+ CreateControls(board, allowExistingControls);
ApplyConfig(cfg, board);
}
@@ -98,15 +102,16 @@ void TImmediateControlsConfigurator::Handle(TEvConsole::TEvConfigNotificationReq
ctx.Send(ev->Sender, resp.Release(), 0, ev->Cookie);
}
-void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board)
+void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board, bool allowExisting)
{
auto *desc = NKikimrConfig::TImmediateControlsConfig::descriptor();
- CreateControls(board, desc, "");
+ CreateControls(board, desc, "", allowExisting);
}
void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard> board,
const google::protobuf::Descriptor *desc,
- const TString &prefix)
+ const TString &prefix,
+ bool allowExisting)
{
for (int i = 0; i < desc->field_count(); ++i) {
auto *fieldDesc = desc->field(i);
@@ -117,20 +122,22 @@ void TImmediateControlsConfigurator::CreateControls(TIntrusivePtr<TControlBoard>
auto fieldType = fieldDesc->type();
if (fieldType == google::protobuf::FieldDescriptor::TYPE_UINT64
|| fieldType == google::protobuf::FieldDescriptor::TYPE_INT64)
- AddControl(board, fieldDesc, prefix);
+ AddControl(board, fieldDesc, prefix, allowExisting);
else {
Y_VERIFY(fieldType == google::protobuf::FieldDescriptor::TYPE_MESSAGE,
"Only [u]int64 and message fields are allowed in Immediate Controls Config");
CreateControls(board, fieldDesc->message_type(),
- MakePrefix(prefix, fieldDesc->name()));
+ MakePrefix(prefix, fieldDesc->name()),
+ allowExisting);
}
}
}
void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> board,
const google::protobuf::FieldDescriptor *desc,
- const TString &prefix)
+ const TString &prefix,
+ bool allowExisting)
{
auto &opts = desc->options().GetExtension(NKikimrConfig::ControlOptions);
auto name = MakePrefix(prefix, desc->name());
@@ -139,11 +146,22 @@ void TImmediateControlsConfigurator::AddControl(TIntrusivePtr<TControlBoard> boa
ui64 maxValue = opts.GetMaxValue();
Controls[name] = TControlWrapper(defaultValue, minValue, maxValue);
- // All controls are actually shared but use RegisterLocalControl to
- // make sure it is not registered by someone else with other options.
- auto res = board->RegisterLocalControl(Controls[name], name);
- Y_VERIFY_S(res, "Immediate Control " << name << " was registered before "
- << "TImmediateControlsConfigurator creation");
+
+ // When we register control it is possible that is has already been
+ // registered by some other code, in which case it may be used before it
+ // is properly configured. It can currently only happen in configurator
+ // tests, where it is created very late after some tablets have already
+ // started.
+ auto res = board->RegisterSharedControl(Controls[name], name);
+ Y_VERIFY_S(res || allowExisting,
+ "Immediate Control " << name << " was registered before "
+ << "TImmediateControlsConfigurator creation");
+ if (Y_UNLIKELY(!res)) {
+ Cerr << "WARNING: immediate control " << name << " was registered before "
+ << "TImmediateControlsConfigurator creation. "
+ << "A default value may have been used before it was configured." << Endl;
+ Controls[name].Reset(defaultValue, minValue, maxValue);
+ }
}
void TImmediateControlsConfigurator::ApplyConfig(const NKikimrConfig::TImmediateControlsConfig &cfg,
@@ -192,9 +210,10 @@ TString TImmediateControlsConfigurator::MakePrefix(const TString &prefix,
}
IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg)
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls)
{
- return new TImmediateControlsConfigurator(board, cfg);
+ return new TImmediateControlsConfigurator(board, cfg, allowExistingControls);
}
} // namespace NConsole
diff --git a/ydb/core/cms/console/immediate_controls_configurator.h b/ydb/core/cms/console/immediate_controls_configurator.h
index c6c4ba029d..53f1b3e5d7 100644
--- a/ydb/core/cms/console/immediate_controls_configurator.h
+++ b/ydb/core/cms/console/immediate_controls_configurator.h
@@ -13,7 +13,8 @@ namespace NConsole {
* immediate control board via CMS.
*/
IActor *CreateImmediateControlsConfigurator(TIntrusivePtr<TControlBoard> board,
- const NKikimrConfig::TImmediateControlsConfig &cfg);
+ const NKikimrConfig::TImmediateControlsConfig &cfg,
+ bool allowExistingControls = false);
} // namespace NConsole
} // namespace NKikimr
diff --git a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
index 8ad8f6ed1b..2d2a9f8251 100644
--- a/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
+++ b/ydb/core/cms/console/immediate_controls_configurator_ut.cpp
@@ -57,7 +57,8 @@ NKikimrConsole::TConfigItem ITEM_CONTROLS_EXCEED_MAX;
void InitImmediateControlsConfigurator(TTenantTestRuntime &runtime)
{
runtime.Register(CreateImmediateControlsConfigurator(runtime.GetAppData().Icb,
- NKikimrConfig::TImmediateControlsConfig()));
+ NKikimrConfig::TImmediateControlsConfig(),
+ /* allowExistingControls */ true));
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvConfigsDispatcher::EvSetConfigSubscriptionResponse, 1);
runtime.DispatchEvents(options);
diff --git a/ydb/core/cms/json_proxy_proto.h b/ydb/core/cms/json_proxy_proto.h
index b1b87f30cd..9c66617066 100644
--- a/ydb/core/cms/json_proxy_proto.h
+++ b/ydb/core/cms/json_proxy_proto.h
@@ -73,6 +73,8 @@ protected:
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TDataShardControls::TExecutionProfileOptions::descriptor(), ctx);
else if (name == ".NKikimrConfig.TImmediateControlsConfig.TTxLimitControls")
return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TTxLimitControls::descriptor(), ctx);
+ else if (name == ".NKikimrConfig.TImmediateControlsConfig.TCoordinatorControls")
+ return ReplyWithTypeDescription(*NKikimrConfig::TImmediateControlsConfig::TCoordinatorControls::descriptor(), ctx);
}
ctx.Send(RequestEvent->Sender,
diff --git a/ydb/core/control/immediate_control_board_control.cpp b/ydb/core/control/immediate_control_board_control.cpp
index 7762cf9d88..6acb68e985 100644
--- a/ydb/core/control/immediate_control_board_control.cpp
+++ b/ydb/core/control/immediate_control_board_control.cpp
@@ -15,6 +15,13 @@ void TControl::Set(TAtomicBase newValue) {
AtomicSet(Default, newValue);
}
+void TControl::Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) {
+ Value = defaultValue;
+ Default = defaultValue;
+ LowerBound = lowerBound;
+ UpperBound = upperBound;
+}
+
TAtomicBase TControl::SetFromHtmlRequest(TAtomicBase newValue) {
TAtomicBase prevValue = AtomicGet(Value);
if (newValue == AtomicGet(Default)) {
diff --git a/ydb/core/control/immediate_control_board_control.h b/ydb/core/control/immediate_control_board_control.h
index 7fd038b03a..381b9cb0fc 100644
--- a/ydb/core/control/immediate_control_board_control.h
+++ b/ydb/core/control/immediate_control_board_control.h
@@ -16,6 +16,7 @@ public:
TControl(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound);
void Set(TAtomicBase newValue);
+ void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound);
TAtomicBase SetFromHtmlRequest(TAtomicBase newValue);
diff --git a/ydb/core/control/immediate_control_board_impl.cpp b/ydb/core/control/immediate_control_board_impl.cpp
index fa26926fae..06f6dab1a8 100644
--- a/ydb/core/control/immediate_control_board_impl.cpp
+++ b/ydb/core/control/immediate_control_board_impl.cpp
@@ -15,8 +15,14 @@ bool TControlBoard::RegisterLocalControl(TControlWrapper control, TString name)
return result;
}
-void TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) {
- control.Control = Board.InsertIfAbsent(name, control.Control);
+bool TControlBoard::RegisterSharedControl(TControlWrapper& control, TString name) {
+ auto& ptr = Board.InsertIfAbsent(name, control.Control);
+ if (control.Control == ptr) {
+ return true;
+ } else {
+ control.Control = ptr;
+ return false;
+ }
}
void TControlBoard::RestoreDefaults() {
diff --git a/ydb/core/control/immediate_control_board_impl.h b/ydb/core/control/immediate_control_board_impl.h
index a01e09f4d6..f5463aa12d 100644
--- a/ydb/core/control/immediate_control_board_impl.h
+++ b/ydb/core/control/immediate_control_board_impl.h
@@ -14,7 +14,7 @@ private:
public:
bool RegisterLocalControl(TControlWrapper control, TString name);
- void RegisterSharedControl(TControlWrapper& control, TString name);
+ bool RegisterSharedControl(TControlWrapper& control, TString name);
void RestoreDefaults();
diff --git a/ydb/core/control/immediate_control_board_wrapper.h b/ydb/core/control/immediate_control_board_wrapper.h
index ce8a6adde5..e2da0d80d0 100644
--- a/ydb/core/control/immediate_control_board_wrapper.h
+++ b/ydb/core/control/immediate_control_board_wrapper.h
@@ -29,6 +29,15 @@ public:
bool IsTheSame(TControlWrapper another) {
return Control == another.Control;
}
+
+ /**
+ * Resets an existing control to different values.
+ *
+ * WARNING: this method is not thread safe and may only be used during initialization.
+ */
+ void Reset(TAtomicBase defaultValue, TAtomicBase lowerBound, TAtomicBase upperBound) {
+ Control->Reset(defaultValue, lowerBound, upperBound);
+ }
};
class TMemorizableControlWrapper {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ba911f7f4f..9cce061fe1 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1178,6 +1178,28 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];
+
+ optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = {
+ Description: "Enables prioritized mvcc snapshot reads over immediate writes",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = {
+ Description: "Enables unprotected (fully readonly) mvcc snapshot reads",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+
+ optional uint64 EnableLeaderLeases = 10 [(ControlOptions) = {
+ Description: "Enables leader leases for processing read-only queries",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 MinLeaderLeaseDurationUs = 11 [(ControlOptions) = {
+ Description: "The minimum leader lease duration in microseconds",
+ MinValue: 1000,
+ MaxValue: 5000000,
+ DefaultValue: 250000 }];
}
message TTxLimitControls {
@@ -1203,8 +1225,22 @@ message TImmediateControlsConfig {
DefaultValue: 60000 }];
}
+ message TCoordinatorControls {
+ optional uint64 EnableLeaderLeases = 1 [(ControlOptions) = {
+ Description: "Enables leader leases for processing read-only queries",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 MinLeaderLeaseDurationUs = 2 [(ControlOptions) = {
+ Description: "The minimum leader lease duration in microseconds",
+ MinValue: 1000,
+ MaxValue: 5000000,
+ DefaultValue: 250000 }];
+ }
+
optional TDataShardControls DataShardControls = 1;
optional TTxLimitControls TxLimitControls = 2;
+ optional TCoordinatorControls CoordinatorControls = 3;
};
message TMeteringConfig {
diff --git a/ydb/core/protos/counters_coordinator.proto b/ydb/core/protos/counters_coordinator.proto
index 28433e3e1a..f5437245bb 100644
--- a/ydb/core/protos/counters_coordinator.proto
+++ b/ydb/core/protos/counters_coordinator.proto
@@ -12,7 +12,7 @@ enum ESimpleCounters {
}
enum ECumulativeCounters {
- COUNTER_CUMULATIVE_IGNORE = 0;
+ COUNTER_REQ_ACQUIRE_READ_STEP = 0 [(CounterOpts) = {Name: "AcquireReadStepRequests"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto
index f0bec238e7..9e51d432da 100644
--- a/ydb/core/protos/tablet.proto
+++ b/ydb/core/protos/tablet.proto
@@ -60,6 +60,11 @@ message TTabletTypes {
}
}
+message TTabletLogMetadata {
+ optional uint32 Key = 1;
+ optional bytes Data = 2;
+}
+
message TTabletLogEntry {
// normal log entries
repeated uint32 DependsOn = 1;
@@ -79,6 +84,7 @@ message TTabletLogEntry {
repeated fixed64 ZeroTailBitmask = 12;
optional bytes EmbeddedLogBody = 13;
+ repeated TTabletLogMetadata EmbeddedMetadata = 14;
}
message TTabletChannelInfo {
@@ -238,3 +244,11 @@ message TEvTabletStop {
optional uint64 TabletID = 1;
optional EReason Reason = 2;
}
+
+message TEvDropLease {
+ optional uint64 TabletID = 1;
+}
+
+message TEvLeaseDropped {
+ optional uint64 TabletID = 1;
+}
diff --git a/ydb/core/tablet/tablet_req_rebuildhistory.cpp b/ydb/core/tablet/tablet_req_rebuildhistory.cpp
index c0b4c0c2bc..620b25c1a8 100644
--- a/ydb/core/tablet/tablet_req_rebuildhistory.cpp
+++ b/ydb/core/tablet/tablet_req_rebuildhistory.cpp
@@ -44,6 +44,7 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
bool IsSnapshot;
bool IsTotalSnapshot;
TString EmbeddedLogBody;
+ TVector<TEvTablet::TCommitMetadata> EmbeddedMetadata;
TVector<TLogoBlobID> GcDiscovered;
TVector<TLogoBlobID> GcLeft;
@@ -125,6 +126,14 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
GcLeft[i] = LogoBlobIDFromLogoBlobID(x.GetGcLeft(i));
}
+ if (const size_t metaSize = x.EmbeddedMetadataSize()) {
+ EmbeddedMetadata.reserve(metaSize);
+ for (size_t i = 0; i < metaSize; ++i) {
+ const auto& meta = x.GetEmbeddedMetadata(i);
+ EmbeddedMetadata.emplace_back(meta.GetKey(), meta.GetData());
+ }
+ }
+
switch (Status) {
case StatusUnknown:
Status = StatusBody;
@@ -679,9 +688,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
}());
if (entry.EmbeddedLogBody)
- graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
else
- graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
if (lastUnbrokenTailEntry + 1 == id.second)
lastUnbrokenTailEntry = id.second;
@@ -717,9 +726,9 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
}());
if (entry.EmbeddedLogBody)
- graph->AddEntry(id, entry.EmbeddedLogBody, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.EmbeddedLogBody), std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
else
- graph->AddEntry(id, entry.References, entry.IsSnapshot, entry.GcDiscovered, entry.GcLeft);
+ graph->AddEntry(id, std::move(entry.References), entry.IsSnapshot, std::move(entry.GcDiscovered), std::move(entry.GcLeft), std::move(entry.EmbeddedMetadata));
hasSnapshotInGeneration |= entry.IsSnapshot;
break;
diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp
index 3f3081f128..4537a4af91 100644
--- a/ydb/core/tablet/tablet_sys.cpp
+++ b/ydb/core/tablet/tablet_sys.cpp
@@ -1164,6 +1164,16 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) {
entry->FollowerUpdate->Body = msg->EmbeddedLogBody;
}
+ if (!msg->EmbeddedMetadata.empty()) {
+ auto *m = x->MutableEmbeddedMetadata();
+ m->Reserve(msg->EmbeddedMetadata.size());
+ for (const auto &meta : msg->EmbeddedMetadata) {
+ auto *p = m->Add();
+ p->SetKey(meta.Key);
+ p->SetData(meta.Data);
+ }
+ }
+
if (saveFollowerUpdate && msg->FollowerAux)
entry->FollowerUpdate->AuxPayload = msg->FollowerAux;
diff --git a/ydb/core/tablet_flat/flat_boot_cookie.h b/ydb/core/tablet_flat/flat_boot_cookie.h
index 58e810325a..4ff737c2bd 100644
--- a/ydb/core/tablet_flat/flat_boot_cookie.h
+++ b/ydb/core/tablet_flat/flat_boot_cookie.h
@@ -68,6 +68,10 @@ namespace NBoot {
const ui32 Raw = 0; /* only lower 24 bits are used */
};
+ enum class ELogCommitMeta : ui32 {
+ LeaseInfo = 1,
+ };
+
}
}
}
diff --git a/ydb/core/tablet_flat/flat_boot_lease.cpp b/ydb/core/tablet_flat/flat_boot_lease.cpp
new file mode 100644
index 0000000000..0fe6966b49
--- /dev/null
+++ b/ydb/core/tablet_flat/flat_boot_lease.cpp
@@ -0,0 +1,110 @@
+#include "flat_executor_bootlogic.h"
+#include <ydb/core/util/pb.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+
+namespace NKikimr {
+namespace NTabletFlatExecutor {
+
+class TLeaseWaiter : public TActorBootstrapped<TLeaseWaiter> {
+public:
+ TLeaseWaiter(
+ const TActorId& owner,
+ TMonotonic bootTimestamp,
+ const TActorId& leaseHolder,
+ TDuration leaseDuration)
+ : Owner(owner)
+ , BootTimestamp(bootTimestamp)
+ , LeaseHolder(leaseHolder)
+ , LeaseDuration(leaseDuration)
+ { }
+
+ void Bootstrap() {
+ SendRequest();
+ Schedule(BootTimestamp + LeaseDuration * 2, new TEvents::TEvWakeup);
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvents::TEvPoison, PassAway);
+ sFunc(TEvents::TEvWakeup, HandleTimeout);
+ hFunc(TEvTablet::TEvLeaseDropped, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ }
+ }
+
+ void SendRequest() {
+ Send(LeaseHolder, new TEvTablet::TEvDropLease(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
+ }
+
+ void Finish() {
+ Send(Owner, new TEvTablet::TEvLeaseDropped());
+ PassAway();
+ }
+
+ void HandleTimeout() {
+ Finish();
+ }
+
+ void Handle(TEvTablet::TEvLeaseDropped::TPtr&) {
+ Finish();
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr&) {
+ // We don't need to do anything
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ // Keep retrying until timeout is reached
+ SendRequest();
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ auto* msg = ev->Get();
+ Y_VERIFY(ev->Sender == LeaseHolder);
+ Y_VERIFY(msg->SourceType == TEvTablet::TEvDropLease::EventType);
+ if (msg->Reason == TEvents::TEvUndelivered::ReasonActorUnknown) {
+ // We have proved lease holder no longer exists
+ return Finish();
+ }
+
+ // We may get undelivered notifications due to disconnections
+ // Since we cannot trust them we expect to retry on TEvNodeDisconnected
+ }
+
+private:
+ const TActorId Owner;
+ const TMonotonic BootTimestamp;
+ const TActorId LeaseHolder;
+ const TDuration LeaseDuration;
+};
+
+void TExecutorBootLogic::StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept
+{
+ TActorId leaseHolder;
+ TDuration leaseDuration;
+
+ for (const auto& entry : graph.Entries) {
+ for (const auto& meta : entry.EmbeddedMetadata) {
+ if (NBoot::ELogCommitMeta(meta.Key) == NBoot::ELogCommitMeta::LeaseInfo) {
+ TProtoBox<NKikimrExecutorFlat::TLeaseInfoMetadata> proto(meta.Data);
+ if (proto.HasLeaseHolder()) {
+ leaseHolder = ActorIdFromProto(proto.GetLeaseHolder());
+ }
+ if (proto.HasLeaseDurationUs()) {
+ leaseDuration = TDuration::MicroSeconds(proto.GetLeaseDurationUs());
+ }
+ }
+ }
+ }
+
+ if (leaseHolder && leaseDuration) {
+ LeaseWaiter = Ops->RegisterWithSameMailbox(new TLeaseWaiter(SelfId, bootTimestamp, leaseHolder, leaseDuration));
+ }
+}
+
+} // namespace NTabletFlatExecutor
+} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/flat_exec_commit.h b/ydb/core/tablet_flat/flat_exec_commit.h
index 7212449c25..a1402dcf24 100644
--- a/ydb/core/tablet_flat/flat_exec_commit.h
+++ b/ydb/core/tablet_flat/flat_exec_commit.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/base/logoblob.h>
+#include <ydb/core/base/tablet.h>
#include <util/generic/vector.h>
#include <util/generic/string.h>
@@ -53,6 +54,7 @@ namespace NTabletFlatExecutor {
TString FollowerAux;
TVector<TLogoBlob> Refs;
TGCBlobDelta GcDelta;
+ TVector<TEvTablet::TCommitMetadata> Metadata;
TSeat *FirstTx = nullptr;
TSeat *LastTx = nullptr;
};
diff --git a/ydb/core/tablet_flat/flat_exec_commit_mgr.h b/ydb/core/tablet_flat/flat_exec_commit_mgr.h
index 20f5d5ffe1..caeabb014f 100644
--- a/ydb/core/tablet_flat/flat_exec_commit_mgr.h
+++ b/ydb/core/tablet_flat/flat_exec_commit_mgr.h
@@ -175,6 +175,7 @@ namespace NTabletFlatExecutor {
ev->FollowerAux = std::move(commit.FollowerAux);
ev->GcDiscovered = std::move(commit.GcDelta.Created);
ev->GcLeft = std::move(commit.GcDelta.Deleted);
+ ev->EmbeddedMetadata = std::move(commit.Metadata);
Ops->Send(Owner, ev, 0, ui64(commit.Type));
}
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 164775ea47..91f3a137bc 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -408,6 +408,15 @@ void TExecutor::Active(const TActorContext &ctx) {
CompactionLogic->UpdateInMemStatsStep(it.first, 0, Database->GetTableMemSize(it.first));
UpdateCompactions();
+
+ LeaseEnabled = Owner->ReadOnlyLeaseEnabled();
+ if (LeaseEnabled) {
+ LeaseDuration = Owner->ReadOnlyLeaseDuration();
+ if (!LeaseDuration) {
+ LeaseEnabled = false;
+ }
+ }
+
MakeLogSnapshot();
if (auto error = CheckBorrowConsistency()) {
@@ -432,8 +441,6 @@ void TExecutor::TranscriptBootOpResult(ui32 res, const TActorContext &ctx) {
case TExecutorBootLogic::OpResultComplete:
return Active(ctx);
case TExecutorBootLogic::OpResultBroken:
- BootLogic.Destroy();
-
if (auto logl = Logger->Log(ELnLev::Error)) {
logl << NFmt::Do(*this) << " Broken while booting";
}
@@ -453,10 +460,10 @@ void TExecutor::TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ct
case TExecutorBootLogic::OpResultComplete:
return ActivateFollower(ctx);
case TExecutorBootLogic::OpResultBroken:
- BootLogic.Destroy();
if (auto logl = Logger->Log(ELnLev::Error)) {
logl << NFmt::Do(*this) << " Broken while follower booting";
}
+
return Broken();
default:
Y_FAIL("unknown boot result");
@@ -588,6 +595,7 @@ TExecutorCaches TExecutor::CleanupState() {
TExecutorCaches caches;
if (BootLogic) {
+ BootLogic->Cancel();
caches = BootLogic->DetachCaches();
} else {
if (PrivatePageCache) {
@@ -1395,6 +1403,85 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
}
+TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ Y_VERIFY(at >= LeaseEnd);
+
+ if (!LeaseEnabled) {
+ // Automatically enable leases
+ LeaseEnabled = true;
+ LeaseDuration = Owner->ReadOnlyLeaseDuration();
+ Y_VERIFY(LeaseDuration);
+ }
+
+ // Try to find a suitable commit that is already in flight
+ TLeaseCommit* lease = nullptr;
+ for (auto it = LeaseCommits.rbegin(); it != LeaseCommits.rend(); ++it) {
+ if (at < it->LeaseEnd) {
+ lease = &*it;
+ } else {
+ break;
+ }
+ }
+
+ if (!lease) {
+ if (LeaseDropped) {
+ // We cannot start new lease confirmations
+ return nullptr;
+ }
+
+ LogicRedo->FlushBatchedLog();
+
+ auto commit = CommitManager->Begin(true, ECommit::Misc);
+
+ NKikimrExecutorFlat::TLeaseInfoMetadata proto;
+ ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
+ proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
+
+ TString data;
+ bool ok = proto.SerializeToString(&data);
+ Y_VERIFY(ok);
+
+ commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+
+ CommitManager->Commit(commit);
+ }
+
+ return lease;
+}
+
+void TExecutor::ConfirmReadOnlyLease(TMonotonic at) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ LeaseUsed = true;
+
+ if (LeaseEnabled && at < LeaseEnd) {
+ return;
+ }
+
+ EnsureReadOnlyLease(at);
+}
+
+void TExecutor::ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) {
+ Y_VERIFY(Stats->IsActive && !Stats->IsFollower);
+ LeaseUsed = true;
+
+ if (LeaseEnabled && at < LeaseEnd) {
+ callback();
+ return;
+ }
+
+ if (auto* lease = EnsureReadOnlyLease(at)) {
+ lease->Callbacks.push_back(std::move(callback));
+ }
+}
+
+void TExecutor::ConfirmReadOnlyLease(std::function<void()> callback) {
+ ConfirmReadOnlyLease(AppData()->MonotonicTimeProvider->Now(), std::move(callback));
+}
+
bool TExecutor::CanExecuteTransaction() const {
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
}
@@ -2389,6 +2476,21 @@ void TExecutor::MakeLogSnapshot() {
GcLogic->SnapToLog(snap, commit->Step);
LogicSnap->MakeSnap(snap, *commit, Logger.Get());
+ if (LeaseEnabled) {
+ NKikimrExecutorFlat::TLeaseInfoMetadata proto;
+ ActorIdToProto(SelfId(), proto.MutableLeaseHolder());
+ proto.SetLeaseDurationUs(LeaseDuration.MicroSeconds());
+
+ TString data;
+ bool ok = proto.SerializeToString(&data);
+ Y_VERIFY(ok);
+
+ commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
+
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
+ }
+
CommitManager->Commit(commit);
CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());
@@ -2605,6 +2707,41 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) {
}
}
+void TExecutor::Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx) {
+ TMonotonic ts = AppData(ctx)->MonotonicTimeProvider->Now();
+
+ LeaseDropped = true;
+ LeaseEnd = Min(LeaseEnd, ts);
+
+ for (auto& l : LeaseCommits) {
+ l.LeaseEnd = Min(l.LeaseEnd, ts);
+ }
+
+ ctx.Send(ev->Sender, new TEvTablet::TEvLeaseDropped);
+ Owner->ReadOnlyLeaseDropped();
+}
+
+void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &) {
+ Y_VERIFY(LeaseExtendPending);
+ LeaseExtendPending = false;
+
+ if (!LeaseCommits.empty() || !LeaseEnabled || LeaseDropped) {
+ return;
+ }
+
+ if (LeaseUsed) {
+ LeaseUsed = false;
+ UnusedLeaseExtensions = 0;
+ } else if (UnusedLeaseExtensions >= 5) {
+ return;
+ } else {
+ ++UnusedLeaseExtensions;
+ }
+
+ // Start a new lease extension commit
+ EnsureReadOnlyLease(LeaseEnd);
+}
+
void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx) {
TEvTablet::TEvCommitResult *msg = ev->Get();
@@ -2631,6 +2768,41 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
<< " for step " << step;
}
+ if (!LeaseCommits.empty()) {
+ auto& l = LeaseCommits.front();
+ Y_VERIFY(step <= l.Step);
+ if (step == l.Step) {
+ LeasePersisted = true;
+ LeaseEnd = Max(LeaseEnd, l.LeaseEnd);
+
+ while (!l.Callbacks.empty()) {
+ // Note: callback may (though unlikely) recursively add more callbacks
+ TVector<std::function<void()>> callbacks;
+ callbacks.swap(l.Callbacks);
+ for (auto& callback : callbacks) {
+ callback();
+ }
+ }
+
+ LeaseCommits.pop_front();
+
+ // Calculate a full round-trip latency for leases
+ // When this latency is larger than third of lease duration we want
+ // to increase lease duration so we would have enough time for
+ // processing read-only requests without additional commits
+ TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
+ if ((LeaseEnd - ts) < LeaseDuration / 3) {
+ LeaseDuration *= 2;
+ }
+
+ // We want to schedule a new commit before the lease expires
+ if (LeaseCommits.empty() && !LeaseExtendPending) {
+ Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend);
+ LeaseExtendPending = true;
+ }
+ }
+ }
+
switch (cookie) {
case ECommit::Redo:
{
@@ -3562,11 +3734,13 @@ STFUNC(TExecutor::StateWork) {
CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters);
cFunc(TEvPrivate::EvCheckYellow, UpdateYellow);
cFunc(TEvPrivate::EvUpdateCompactions, UpdateCompactions);
+ HFunc(TEvPrivate::TEvLeaseExtend, Handle);
HFunc(TEvents::TEvWakeup, Wakeup);
hFunc(TEvents::TEvFlushLog, Handle);
hFunc(NSharedCache::TEvRequest, Handle);
hFunc(NSharedCache::TEvResult, Handle);
hFunc(NSharedCache::TEvUpdated, Handle);
+ HFunc(TEvTablet::TEvDropLease, Handle);
HFunc(TEvTablet::TEvCommitResult, Handle);
hFunc(TEvTablet::TEvCheckBlobstorageStatusResult, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 4c6f091560..9e72b436cf 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -334,6 +334,7 @@ class TExecutor
EvActivateCompactionRead,
EvActivateCompactionChanges,
EvBrokenTransaction,
+ EvLeaseExtend,
EvEnd
};
@@ -347,6 +348,7 @@ class TExecutor
struct TEvActivateCompactionRead : public TEventLocal<TEvActivateCompactionRead, EvActivateCompactionRead> {};
struct TEvActivateCompactionChanges : public TEventLocal<TEvActivateCompactionChanges, EvActivateCompactionChanges> {};
struct TEvBrokenTransaction : public TEventLocal<TEvBrokenTransaction, EvBrokenTransaction> {};
+ struct TEvLeaseExtend : public TEventLocal<TEvLeaseExtend, EvLeaseExtend> {};
};
const TIntrusivePtr<ITimeProvider> Time = nullptr;
@@ -356,6 +358,37 @@ class TExecutor
ui32 FollowerId = 0;
+ // This becomes true when executor enables the use of leases, e.g. starts persisting them
+ // This may become false again when leases are not actively used for some time
+ bool LeaseEnabled = false;
+ // As soon as lease is persisted we may theoretically use read-only checks for lease prolongation
+ bool LeasePersisted = false;
+ // When lease is dropped we must stop accepting new lease-dependent requests
+ bool LeaseDropped = false;
+ // When lease is used in any given cycle this becomes true
+ bool LeaseUsed = false;
+ // This flag marks when TEvLeaseExtend message is already pending
+ bool LeaseExtendPending = false;
+ TDuration LeaseDuration;
+ TMonotonic LeaseEnd;
+ // Counts the number of times an unused lease has been extended
+ size_t UnusedLeaseExtensions = 0;
+
+ struct TLeaseCommit {
+ const ui32 Step;
+ const TMonotonic Start;
+ TMonotonic LeaseEnd;
+ TVector<std::function<void()>> Callbacks;
+
+ TLeaseCommit(ui32 step, TMonotonic start, TMonotonic leaseEnd)
+ : Step(step)
+ , Start(start)
+ , LeaseEnd(leaseEnd)
+ { }
+ };
+
+ TList<TLeaseCommit> LeaseCommits;
+
using TActivationQueue = TOneOneQueueInplace<TSeat *, 64>;
THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> ActivationQueue;
THolder<TActivationQueue, TActivationQueue::TPtrCleanDestructor> PendingQueue;
@@ -502,6 +535,8 @@ class TExecutor
void ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch);
void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvTablet::TEvDropLease::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPrivate::TEvLeaseExtend::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx);
@@ -575,6 +610,11 @@ public:
void DetachTablet(const TActorContext &ctx) override;
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
+ TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
+ void ConfirmReadOnlyLease(TMonotonic at) override;
+ void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) override;
+ void ConfirmReadOnlyLease(std::function<void()> callback) override;
+
TString BorrowSnapshot(ui32 tableId, const TTableSnapshotContext& snap, TRawVals from, TRawVals to, ui64 loaner) const override;
ui64 MakeScanSnapshot(ui32 table) override;
diff --git a/ydb/core/tablet_flat/flat_executor.proto b/ydb/core/tablet_flat/flat_executor.proto
index 7758812187..0b865d16df 100644
--- a/ydb/core/tablet_flat/flat_executor.proto
+++ b/ydb/core/tablet_flat/flat_executor.proto
@@ -1,6 +1,7 @@
import "ydb/core/protos/base.proto";
import "ydb/core/protos/tablet.proto";
import "ydb/core/protos/flat_scheme_op.proto";
+import "library/cpp/actors/protos/actors.proto";
package NKikimrExecutorFlat;
option java_package = "ru.yandex.kikimr.proto";
@@ -247,3 +248,13 @@ message TDatabaseBorrowPart {
repeated TPartInfo Parts = 3;
repeated TTxStatus TxStatusParts = 4;
}
+
+message TLeaseInfoMetadata {
+ // Actor id of the lease holder, which is an executor actor
+ optional NActorsProto.TActorId LeaseHolder = 1;
+
+ // Lease duration in microseconds. Lease holder may perform leader-only
+ // tasks up to this duration since the last channel 0 block confirmation,
+ // and expects future generations will not interfere.
+ optional uint32 LeaseDurationUs = 2;
+}
diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
index cf72960f9c..4292a61d3c 100644
--- a/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_bootlogic.cpp
@@ -103,9 +103,14 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot(
TEvTablet::TEvBoot::TPtr &ev,
TExecutorCaches &&caches)
{
- PrepareEnv(false, ev->Get()->Generation, std::move(caches));
+ TEvTablet::TEvBoot *msg = ev->Get();
+ PrepareEnv(false, msg->Generation, std::move(caches));
- Steps->Spawn<NBoot::TStages>(std::move(ev->Get()->DependencyGraph), nullptr);
+ if (msg->DependencyGraph) {
+ StartLeaseWaiter(BootTimestamp, *msg->DependencyGraph);
+ }
+
+ Steps->Spawn<NBoot::TStages>(std::move(msg->DependencyGraph), nullptr);
Steps->Execute();
return CheckCompletion();
@@ -113,7 +118,7 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::ReceiveBoot(
void TExecutorBootLogic::PrepareEnv(bool follower, ui32 gen, TExecutorCaches caches) noexcept
{
- BootStartTime = TAppData::TimeProvider->Now();
+ BootTimestamp = AppData()->MonotonicTimeProvider->Now();
auto *sys = TlsActivationContext->ExecutorThread.ActorSystem;
auto *logger = new NUtil::TLogger(sys, NKikimrServices::TABLET_FLATBOOT);
@@ -212,9 +217,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::CheckCompletion()
if (Loads)
return OpResultContinue;
+ if (LeaseWaiter)
+ return OpResultContinue;
+
if (State().Follower || Restored) {
if (auto logl = Steps->Logger()->Log(ELnLev::Info)) {
- auto spent = TAppData::TimeProvider->Now() - BootStartTime;
+ auto spent = AppData()->MonotonicTimeProvider->Now() - BootTimestamp;
logl
<< NFmt::Do(State()) << " booting completed"
@@ -278,6 +286,12 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl
step.Drop();
Steps->Execute();
+ } else if (auto *msg = ev.CastAsLocal<TEvTablet::TEvLeaseDropped>()) {
+ if (LeaseWaiter != ev.Sender) {
+ return OpResultUnhandled;
+ }
+
+ LeaseWaiter = { };
} else {
return OpResultUnhandled;
}
@@ -291,6 +305,9 @@ TAutoPtr<NBoot::TResult> TExecutorBootLogic::ExtractState() noexcept {
}
void TExecutorBootLogic::Cancel() {
+ if (LeaseWaiter) {
+ Ops->Send(LeaseWaiter, new TEvents::TEvPoison);
+ }
}
void TExecutorBootLogic::FollowersSyncComplete() {
diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.h b/ydb/core/tablet_flat/flat_executor_bootlogic.h
index 34edb2d6b4..56084c36b0 100644
--- a/ydb/core/tablet_flat/flat_executor_bootlogic.h
+++ b/ydb/core/tablet_flat/flat_executor_bootlogic.h
@@ -74,8 +74,9 @@ private:
TAutoPtr<NBoot::TBack> State_;
TAutoPtr<NBoot::TResult> Result_;
TAutoPtr<NBoot::TRoot> Steps;
+ TActorId LeaseWaiter;
- TInstant BootStartTime;
+ TMonotonic BootTimestamp;
const TIntrusiveConstPtr<TTabletStorageInfo> Info;
@@ -91,6 +92,7 @@ private:
EOpResult CheckCompletion();
void PrepareEnv(bool follower, ui32 generation, TExecutorCaches caches) noexcept;
+ void StartLeaseWaiter(TMonotonic bootTimestamp, const TEvTablet::TDependencyGraph& graph) noexcept;
ui32 GetBSGroupFor(const TLogoBlobID &logo) const;
ui32 GetBSGroupID(ui32 channel, ui32 generation);
void LoadEntry(TIntrusivePtr<NBoot::TLoadBlobs>);
diff --git a/ydb/core/tablet_flat/flat_executor_leases_ut.cpp b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
new file mode 100644
index 0000000000..3761f4fcd0
--- /dev/null
+++ b/ydb/core/tablet_flat/flat_executor_leases_ut.cpp
@@ -0,0 +1,365 @@
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/base/statestorage.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NTabletFlatExecutor {
+
+Y_UNIT_TEST_SUITE(TFlatExecutorLeases) {
+
+ struct TEvLeasesTablet {
+ enum EEv {
+ EvWrite = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvWriteAck,
+ EvRead,
+ EvReadResult,
+ };
+
+ struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
+ TEvWrite(ui64 key, ui64 value)
+ : Key(key)
+ , Value(value)
+ { }
+
+ const ui64 Key;
+ const ui64 Value;
+ };
+
+ struct TEvWriteAck : public TEventLocal<TEvWriteAck, EvWriteAck> {
+ TEvWriteAck(ui64 key)
+ : Key(key)
+ { }
+
+ const ui64 Key;
+ };
+
+ struct TEvRead : public TEventLocal<TEvRead, EvRead> {
+ TEvRead(ui64 key)
+ : Key(key)
+ { }
+
+ const ui64 Key;
+ };
+
+ struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> {
+ TEvReadResult(ui64 key, ui64 value)
+ : Key(key)
+ , Value(value)
+ { }
+
+ const ui64 Key;
+ const ui64 Value;
+ };
+ };
+
+ class TLeasesTablet
+ : public TActor<TLeasesTablet>
+ , public TTabletExecutedFlat
+ {
+ public:
+ TLeasesTablet(const TActorId& tablet, TTabletStorageInfo* info, bool enableInitialLease)
+ : TActor(&TThis::StateInit)
+ , TTabletExecutedFlat(info, tablet, nullptr)
+ , EnableInitialLease(enableInitialLease)
+ { }
+
+ private:
+ struct Schema : NIceDb::Schema {
+ struct Data : Table<1> {
+ struct Key : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct Value : Column<2, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<Key>;
+ using TColumns = TableColumns<Key, Value>;
+ };
+
+ using TTables = SchemaTables<Data>;
+ };
+
+ using TTxBase = TTransactionBase<TLeasesTablet>;
+
+ struct TTxInitSchema : public TTxBase {
+ TTxInitSchema(TSelf* self)
+ : TTxBase(self)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Materialize<Schema>();
+ Self->RunTxInit();
+ return true;
+ }
+
+ void Complete(const TActorContext&) {
+ // nothing
+ }
+ };
+
+ void RunTxInitSchema() {
+ Execute(new TTxInitSchema(this));
+ }
+
+ struct TTxInit : public TTxBase {
+ TTxInit(TSelf* self)
+ : TTxBase(self)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ NIceDb::TNiceDb db(txc.DB);
+
+ auto rowset = db.Table<Schema::Data>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ ui64 key = rowset.GetValue<Schema::Data::Key>();
+ ui64 value = rowset.GetValue<Schema::Data::Value>();
+ Self->Data[key] = value;
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) {
+ Self->SwitchToWork(ctx);
+ }
+ };
+
+ void RunTxInit() {
+ Execute(new TTxInit(this));
+ }
+
+ struct TTxWrite : public TTxBase {
+ TTxWrite(TSelf* self, TEvLeasesTablet::TEvWrite::TPtr&& ev)
+ : TTxBase(self)
+ , Ev(std::move(ev))
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ auto* msg = Ev->Get();
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ db.Table<Schema::Data>().Key(msg->Key).Update(
+ NIceDb::TUpdate<Schema::Data::Value>(msg->Value));
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) {
+ auto* msg = Ev->Get();
+ Self->Data[msg->Key] = msg->Value;
+
+ ctx.Send(Ev->Sender, new TEvLeasesTablet::TEvWriteAck(msg->Key), 0, Ev->Cookie);
+ }
+
+ const TEvLeasesTablet::TEvWrite::TPtr Ev;
+ };
+
+ void Handle(TEvLeasesTablet::TEvWrite::TPtr& ev) {
+ Execute(new TTxWrite(this, std::move(ev)));
+ }
+
+ void Handle(TEvLeasesTablet::TEvRead::TPtr& ev) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto key = ev->Get()->Key;
+ auto value = Data[key];
+ Executor()->ConfirmReadOnlyLease([this, sender, cookie, key, value]() {
+ Send(sender, new TEvLeasesTablet::TEvReadResult(key, value), 0, cookie);
+ });
+ }
+
+ private:
+ void OnDetach(const TActorContext& ctx) override {
+ Die(ctx);
+ }
+
+ void OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext& ctx) override {
+ Die(ctx);
+ }
+
+ void OnActivateExecutor(const TActorContext&) override {
+ Become(&TThis::StateWork);
+ RunTxInitSchema();
+ }
+
+ void DefaultSignalTabletActive(const TActorContext&) override {
+ // nothing
+ }
+
+ void SwitchToWork(const TActorContext& ctx) {
+ SignalTabletActive(ctx);
+ }
+
+ bool ReadOnlyLeaseEnabled() override {
+ return EnableInitialLease;
+ }
+
+ private:
+ STFUNC(StateInit) {
+ StateInitImpl(ev, ctx);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvLeasesTablet::TEvWrite, Handle);
+ hFunc(TEvLeasesTablet::TEvRead, Handle);
+ default:
+ HandleDefaultEvents(ev, ctx);
+ }
+ }
+
+ private:
+ const bool EnableInitialLease;
+ THashMap<ui64, ui64> Data;
+ };
+
+ void DoBasics(bool deliverDropLease, bool enableInitialLease) {
+ TTestBasicRuntime runtime(2);
+ SetupTabletServices(runtime);
+ runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+
+ const ui64 tabletId = TTestTxConfig::TxTablet0;
+
+ auto boot1 = CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY),
+ [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) {
+ return new TLeasesTablet(tablet, info, enableInitialLease);
+ });
+ runtime.EnableScheduleForActor(boot1);
+
+ {
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1));
+ runtime.DispatchEvents(options);
+ }
+
+ auto sender = runtime.AllocateEdgeActor(0);
+ auto pipe1 = runtime.ConnectToPipe(tabletId, sender, 0, NTabletPipe::TClientRetryPolicy::WithRetries());
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvWrite(1, 11));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvWriteAck>(sender);
+ }
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ bool blockDropLease = true;
+ TVector<THolder<IEventHandle>> dropLeaseMsgs;
+ auto observerFunc = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTablet::TEvDropLease::EventType:
+ if (blockDropLease) {
+ dropLeaseMsgs.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ case TEvTablet::TEvTabletDead::EventType:
+ // Prevent tablets from restarting
+ // This is most important for the boot1 actor, since it
+ // quickly receives bad commit signal and tries to restart
+ // the original tablet. However we prevent executor from
+ // killing itself too, so we could make additional queries.
+ return TTestActorRuntime::EEventAction::DROP;
+ case TEvTablet::TEvDemoted::EventType:
+ // Block guardian from telling tablet about a new generation
+ return TTestActorRuntime::EEventAction::DROP;
+ case TEvStateStorage::TEvReplicaLeaderDemoted::EventType:
+ // Block replica from telling tablet about a new generation
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(observerFunc);
+
+ auto boot2 = CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(tabletId, TTabletTypes::TX_DUMMY),
+ [enableInitialLease](const TActorId & tablet, TTabletStorageInfo* info) {
+ return new TLeasesTablet(tablet, info, enableInitialLease);
+ },
+ /* node index */ 1);
+ runtime.EnableScheduleForActor(boot2);
+
+ {
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 1));
+ runtime.DispatchEvents(options);
+ }
+
+ // After the new tablet is restored, we should still be able to read from the old tablet for a while
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return dropLeaseMsgs.size() == 1; }, "drop lease message");
+
+ // We expect the new tablet to send a drop lease message, but the old tablet must still be readable, because it didn't receive it yet
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ if (deliverDropLease) {
+ blockDropLease = false;
+ for (auto& ev : dropLeaseMsgs) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ dropLeaseMsgs.clear();
+ }
+
+ auto sender2 = runtime.AllocateEdgeActor(1);
+ auto pipe2 = runtime.ConnectToPipe(tabletId, sender2, 1, NTabletPipe::TClientRetryPolicy::WithRetries());
+ runtime.SendToPipe(pipe2, sender2, new TEvLeasesTablet::TEvRead(1), 1);
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender2);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Value, 11u);
+ }
+
+ runtime.SendToPipe(pipe1, sender, new TEvLeasesTablet::TEvRead(1));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLeasesTablet::TEvReadResult>(sender, TDuration::Seconds(1));
+ UNIT_ASSERT(!ev);
+ }
+ }
+
+ Y_UNIT_TEST(Basics) {
+ DoBasics(true, false);
+ }
+
+ Y_UNIT_TEST(BasicsLeaseTimeout) {
+ DoBasics(false, false);
+ }
+
+ Y_UNIT_TEST(BasicsInitialLease) {
+ DoBasics(true, true);
+ }
+
+ Y_UNIT_TEST(BasicsInitialLeaseTimeout) {
+ DoBasics(false, true);
+ }
+}
+
+} // namespace NKikimr::NTabletFlatExecutor
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp
index f69991ece9..258addbf7f 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.cpp
+++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp
@@ -44,6 +44,18 @@ namespace NFlatExecutorSetup {
if (launcherID)
LauncherActorID = launcherID;
}
+
+ bool ITablet::ReadOnlyLeaseEnabled() {
+ return false;
+ }
+
+ TDuration ITablet::ReadOnlyLeaseDuration() {
+ return TDuration::MilliSeconds(250);
+ }
+
+ void ITablet::ReadOnlyLeaseDropped() {
+ // nothing by default
+ }
}
}}
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index 3d550f30ba..f628f6cd42 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -452,6 +452,10 @@ namespace NFlatExecutorSetup {
virtual void OnLeaderUserAuxUpdate(TString) { /* default */ }
+ virtual bool ReadOnlyLeaseEnabled();
+ virtual TDuration ReadOnlyLeaseDuration();
+ virtual void ReadOnlyLeaseDropped();
+
// create transaction?
protected:
ITablet(TTabletStorageInfo *info, const TActorId &tablet)
@@ -496,6 +500,10 @@ namespace NFlatExecutorSetup {
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
+ virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
+ virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;
+ virtual void ConfirmReadOnlyLease(std::function<void()> callback) = 0;
+
/* Make blob with data required for table bootstapping. Note:
1. Once non-trivial blob obtained and commited in tx all of its
borrowed bundles have to be eventually released (see db).
diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make
index fd66d04202..d213ccbe49 100644
--- a/ydb/core/tablet_flat/ut/ya.make
+++ b/ydb/core/tablet_flat/ut/ya.make
@@ -22,6 +22,7 @@ SRCS(
flat_executor_ut.cpp
flat_executor_database_ut.cpp
flat_executor_gclogic_ut.cpp
+ flat_executor_leases_ut.cpp
flat_range_cache_ut.cpp
flat_row_versions_ut.cpp
flat_sausagecache_ut.cpp
@@ -64,6 +65,7 @@ PEERDIR(
ydb/core/scheme
ydb/core/tablet_flat/test/libs/exec
ydb/core/tablet_flat/test/libs/table
+ ydb/core/testlib
ydb/library/yql/public/udf/service/exception_policy
)
diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make
index 6b1226bf01..1c3135c1eb 100644
--- a/ydb/core/tablet_flat/ya.make
+++ b/ydb/core/tablet_flat/ya.make
@@ -7,6 +7,7 @@ OWNER(
SRCS(
defs.h
+ flat_boot_lease.cpp
flat_boot_misc.cpp
flat_comp.cpp
flat_comp_create.cpp
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index 12e37f4d87..ff09f2df27 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -120,6 +120,9 @@ namespace NActors {
node->LogSettings->MessagePrefix = " node " + ToString(nodeId);
auto* nodeAppData = node->GetAppData<NKikimr::TAppData>();
+ if (!UseRealThreads) {
+ nodeAppData->MonotonicTimeProvider = MonotonicTimeProvider;
+ }
nodeAppData->DataShardExportFactory = app0->DataShardExportFactory;
nodeAppData->DomainsInfo = app0->DomainsInfo;
nodeAppData->ChannelProfiles = app0->ChannelProfiles;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 68438fc419..ccddfee1d1 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -207,9 +207,6 @@ namespace Tests {
const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode;
SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams);
- CreateBootstrapTablets();
- SetupStorage();
-
for (ui32 nodeIdx = 0; nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) {
SetupDomainLocalService(nodeIdx);
Runtime->GetAppData(nodeIdx).AuthConfig.MergeFrom(Settings->AuthConfig);
@@ -223,6 +220,9 @@ namespace Tests {
SetupConfigurators(nodeIdx);
SetupProxies(nodeIdx);
}
+
+ CreateBootstrapTablets();
+ SetupStorage();
}
void TServer::SetupMessageBus(ui16 port, const TString &tracePath) {
diff --git a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
index 8345d93485..e6a36f0007 100644
--- a/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
+++ b/ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp
@@ -96,11 +96,27 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct
LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID()
<< " HANDLE TEvAcquireReadStep");
+ IncCounter(COUNTER_REQ_ACQUIRE_READ_STEP);
+
if (Y_UNLIKELY(Stopping)) {
// We won't be able to commit anyway
return;
}
+ if (ReadOnlyLeaseEnabled()) {
+ // We acquire read step using a read-only lease from executor
+ // It is guaranteed that any future generation was not running at
+ // the time ConfirmReadOnlyLease was called.
+ TActorId sender = ev->Sender;
+ ui64 cookie = ev->Cookie;
+ ui64 step = Max(VolatileState.LastSentStep, VolatileState.LastAcquired);
+ VolatileState.LastAcquired = step;
+ Executor()->ConfirmReadOnlyLease([this, sender, cookie, step]() {
+ Send(sender, new TEvTxProxy::TEvAcquireReadStepResult(TabletID(), step), 0, cookie);
+ });
+ return;
+ }
+
VolatileState.AcquireReadStepPending.emplace_back(ev->Sender, ev->Cookie);
MaybeFlushAcquireReadStep(ctx);
}
diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp
index 1fbc46f449..49aa706c77 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.cpp
+++ b/ydb/core/tx/coordinator/coordinator_impl.cpp
@@ -50,6 +50,8 @@ const ui32 TTxCoordinator::Schema::CurrentVersion = 1;
TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
+ , EnableLeaderLeases(0, 0, 1)
+ , MinLeaderLeaseDurationUs(250000, 1000, 5000000)
#ifdef COORDINATOR_LOG_TO_FILE
, DebugName(Sprintf("/tmp/coordinator_db_log_%" PRIu64 ".%" PRIi32 ".%" PRIu64 ".gz", TabletID(), getpid(), tablet.LocalId()))
, DebugLogFile(DebugName)
@@ -316,7 +318,28 @@ void TTxCoordinator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, cons
}
}
+void TTxCoordinator::IcbRegister() {
+ if (!IcbRegistered) {
+ AppData()->Icb->RegisterSharedControl(EnableLeaderLeases, "CoordinatorControls.EnableLeaderLeases");
+ AppData()->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "CoordinatorControls.MinLeaderLeaseDurationUs");
+ IcbRegistered = true;
+ }
+}
+
+bool TTxCoordinator::ReadOnlyLeaseEnabled() {
+ IcbRegister();
+ ui64 value = EnableLeaderLeases;
+ return value != 0;
+}
+
+TDuration TTxCoordinator::ReadOnlyLeaseDuration() {
+ IcbRegister();
+ ui64 value = MinLeaderLeaseDurationUs;
+ return TDuration::MicroSeconds(value);
+}
+
void TTxCoordinator::OnActivateExecutor(const TActorContext &ctx) {
+ IcbRegister();
TryInitMonCounters(ctx);
Executor()->RegisterExternalTabletCounters(TabletCountersPtr);
Execute(CreateTxSchema(), ctx);
diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h
index 28a817ae46..82f5f4b4d2 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.h
+++ b/ydb/core/tx/coordinator/coordinator_impl.h
@@ -8,6 +8,8 @@
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tx_processing.h>
+#include <ydb/core/control/immediate_control_board_wrapper.h>
+#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -461,6 +463,10 @@ private:
i64 CurrentTxInFly;
};
+ bool IcbRegistered = false;
+ TControlWrapper EnableLeaderLeases;
+ TControlWrapper MinLeaderLeaseDurationUs;
+
TVolatileState VolatileState;
TConfig Config;
TCoordinatorMonCounters MonCounters;
@@ -503,6 +509,14 @@ private:
return TActor::Die(ctx);
}
+ void IcbRegister();
+ bool ReadOnlyLeaseEnabled() override;
+ TDuration ReadOnlyLeaseDuration() override;
+
+ void IncCounter(NFlatTxCoordinator::ECumulativeCounters counter, ui64 num = 1) {
+ TabletCounters->Cumulative()[counter].Increment(num);
+ }
+
void OnActivateExecutor(const TActorContext &ctx) override;
void DefaultSignalTabletActive(const TActorContext &ctx) override {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 3a0de1dc81..71e28bfb17 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -137,6 +137,10 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, ReadColumnsScanInUserPool(0, 0, 1)
, BackupReadAheadLo(0, 0, 64*1024*1024)
, BackupReadAheadHi(0, 0, 128*1024*1024)
+ , EnablePrioritizedMvccSnapshotReads(0, 0, 1)
+ , EnableUnprotectedMvccSnapshotReads(0, 0, 1)
+ , EnableLeaderLeases(0, 0, 1)
+ , MinLeaderLeaseDurationUs(250000, 1000, 5000000)
, DataShardSysTables(InitDataShardSysTables(this))
, ChangeSenderActivator(info->TabletID)
, ChangeExchangeSplitter(this)
@@ -262,6 +266,7 @@ void TDataShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorCo
void TDataShard::Cleanup(const TActorContext& ctx) {
//PipeClientCache->Detach(ctx);
if (RegistrationSended) {
+ ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep());
ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnregisterTablet(TabletID()));
}
@@ -275,26 +280,54 @@ void TDataShard::Cleanup(const TActorContext& ctx) {
}
}
-void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::OnActivateExecutor: tablet " << TabletID() << " actor " << ctx.SelfID);
+void TDataShard::IcbRegister() {
+ if (!IcbRegistered) {
+ auto* appData = AppData();
+
+ appData->Icb->RegisterSharedControl(DisableByKeyFilter, "DataShardControls.DisableByKeyFilter");
+ appData->Icb->RegisterSharedControl(MaxTxInFly, "DataShardControls.MaxTxInFly");
+ appData->Icb->RegisterSharedControl(MaxTxLagMilliseconds, "DataShardControls.MaxTxLagMilliseconds");
+ appData->Icb->RegisterSharedControl(DataTxProfileLogThresholdMs, "DataShardControls.DataTxProfile.LogThresholdMs");
+ appData->Icb->RegisterSharedControl(DataTxProfileBufferThresholdMs, "DataShardControls.DataTxProfile.BufferThresholdMs");
+ appData->Icb->RegisterSharedControl(DataTxProfileBufferSize, "DataShardControls.DataTxProfile.BufferSize");
+
+ appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
+ appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
+ appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
+ appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
- AppData(ctx)->Icb->RegisterSharedControl(DisableByKeyFilter, "DataShardControls.DisableByKeyFilter");
- AppData(ctx)->Icb->RegisterSharedControl(MaxTxInFly, "DataShardControls.MaxTxInFly");
- AppData(ctx)->Icb->RegisterSharedControl(MaxTxLagMilliseconds, "DataShardControls.MaxTxLagMilliseconds");
- AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileLogThresholdMs, "DataShardControls.DataTxProfile.LogThresholdMs");
- AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileBufferThresholdMs, "DataShardControls.DataTxProfile.BufferThresholdMs");
- AppData(ctx)->Icb->RegisterSharedControl(DataTxProfileBufferSize, "DataShardControls.DataTxProfile.BufferSize");
+ appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled");
+ appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool");
- AppData(ctx)->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
- AppData(ctx)->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
- AppData(ctx)->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
- AppData(ctx)->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
+ appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
+ appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");
- AppData(ctx)->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled");
- AppData(ctx)->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool");
+ appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
+ appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
+
+ appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases");
+ appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs");
+
+ IcbRegistered = true;
+ }
+}
+
+bool TDataShard::ReadOnlyLeaseEnabled() {
+ IcbRegister();
+ ui64 value = EnableLeaderLeases;
+ return value != 0;
+}
+
+TDuration TDataShard::ReadOnlyLeaseDuration() {
+ IcbRegister();
+ ui64 value = MinLeaderLeaseDurationUs;
+ return TDuration::MicroSeconds(value);
+}
+
+void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::OnActivateExecutor: tablet " << TabletID() << " actor " << ctx.SelfID);
- AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
- AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");
+ IcbRegister();
// OnActivateExecutor might be called multiple times for a follower
// but the counters should be initialized only once
@@ -319,6 +352,16 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
}
void TDataShard::SwitchToWork(const TActorContext &ctx) {
+ if (IsMvccEnabled() && (
+ SnapshotManager.GetPerformedUnprotectedReads() ||
+ SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step))
+ {
+ // We will need to wait until mediator state is fully restored before
+ // processing new immediate transactions.
+ MediatorStateWaiting = true;
+ CheckMediatorStateRestored();
+ }
+
SyncConfig();
PlanQueue.Progress(ctx);
OutReadSets.ResendAll(ctx);
@@ -355,10 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
<< DatashardStateName(State) << " tabletId " << TabletID()
<< " mediators count is " << ProcessingParams->MediatorsSize()
+ << " coordinators count is " << ProcessingParams->CoordinatorsSize()
<< " buckets per mediator " << ProcessingParams->GetTimeCastBucketsPerMediator());
RegistrationSended = true;
ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams));
+
+ // Subscribe to all known coordinators
+ for (ui64 coordinatorId : ProcessingParams->GetCoordinators()) {
+ size_t index = CoordinatorSubscriptions.size();
+ auto res = CoordinatorSubscriptionById.emplace(coordinatorId, index);
+ if (res.second) {
+ auto& subscription = CoordinatorSubscriptions.emplace_back();
+ subscription.CoordinatorId = coordinatorId;
+ ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvSubscribeReadStep(coordinatorId));
+ ++CoordinatorSubscriptionsPending;
+ }
+ }
}
void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
@@ -1194,7 +1250,8 @@ TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
- return TRowVersion((++edge).Step, ::Max<ui64>());
+ TRowVersion candidate = TRowVersion((++edge).Step, ::Max<ui64>());
+ return Max(candidate, SnapshotManager.GetImmediateWriteEdge());
}
TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const {
@@ -1218,6 +1275,9 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
+ // Since incomplete transactions are still inflight, the actual
+ // version will stick to the first incomplete transaction is queue,
+ // effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
@@ -1235,12 +1295,41 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
- // This is currently active step for immediate writes, not that when
- // writeEdge is equal to some (PlanStep, Max<ui64>()) that means everything
- // up to this point is "fixed" and cannot be changed. In that case we
- // choose at least PlanStep + 1 for new writes.
- ui64 writeStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, (++writeEdge).Step);
- return TRowVersion(writeStep, ::Max<ui64>());
+ // Normally we stick transactions to the end of the last known mediator step
+ // Note this calculations only happen when we don't have distributed
+ // transactions left in queue, and we won't have any more transactions
+ // up to the current mediator time. The mediator time itself may be stale,
+ // in which case we may have evidence of its higher value via complete and
+ // incomplete edges above.
+ const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
+ TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
+
+ switch (mode) {
+ case EMvccTxMode::ReadOnly: {
+ // We want to include everything that was potentially confirmed to
+ // users, but we don't want to include anything that is not replied
+ // at the start of this read.
+ // Note it's only possible to have ImmediateWriteEdge > mediatorEdge
+ // when ImmediateWriteEdge == mediatorEdge + 1
+ return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied(), SnapshotManager.GetUnprotectedReadEdge());
+ }
+
+ case EMvccTxMode::ReadWrite: {
+ // We must use at least a previously used immediate write edge
+ // But we must also avoid trumpling over any unprotected mvcc
+ // snapshot reads that have occurred.
+ // Note it's only possible to go past the last known mediator step
+ // is when we had an unprotected read, which itself happens at the
+ // last mediator step. So we may only ever have a +1 step, never
+ // anything more.
+ TRowVersion postReadEdge = SnapshotManager.GetPerformedUnprotectedReads()
+ ? SnapshotManager.GetUnprotectedReadEdge().Next()
+ : TRowVersion::Min();
+ return Max(mediatorEdge, writeEdge.Next(), postReadEdge, SnapshotManager.GetImmediateWriteEdge());
+ }
+ }
+
+ Y_FAIL("unreachable");
}
TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
@@ -1260,6 +1349,239 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
return mvccVersion;
}
+TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges(
+ const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc)
+{
+ TPromotePostExecuteEdges res;
+
+ res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc);
+
+ switch (mode) {
+ case EPromotePostExecuteEdges::ReadOnly:
+ // We want read-only immediate transactions to be readonly, thus
+ // don't promote the complete edge unnecessarily. On restarts we
+ // will assume anything written is potentially replied anyway,
+ // even if it has never been read.
+ break;
+
+ case EPromotePostExecuteEdges::RepeatableRead: {
+ bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads();
+ if (unprotectedReads) {
+ // We want to use unprotected reads, but we need to make sure it's properly marked first
+ if (!SnapshotManager.GetPerformedUnprotectedReads()) {
+ SnapshotManager.SetPerformedUnprotectedReads(true, txc);
+ res.HadWrites = true;
+ }
+ if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) {
+ // We need to wait for completion until the flag is committed
+ res.WaitCompletion = true;
+ }
+ SnapshotManager.PromoteUnprotectedReadEdge(version);
+ } else if (SnapshotManager.GetPerformedUnprotectedReads()) {
+ // We want to drop the flag as soon as possible
+ SnapshotManager.SetPerformedUnprotectedReads(false, txc);
+ res.HadWrites = true;
+ }
+
+ // We want to promote the complete edge when protected reads are
+ // used or when we're already writing something anyway.
+ if (res.HadWrites || !unprotectedReads) {
+ res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc);
+ if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) {
+ // We need to wait for completion because some other transaction
+ // has moved complete edge, but it's not committed yet.
+ res.WaitCompletion = true;
+ }
+ }
+
+ break;
+ }
+
+ case EPromotePostExecuteEdges::ReadWrite: {
+ if (version.Step <= GetMaxObservedStep()) {
+ res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc);
+ }
+ res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc);
+ break;
+ }
+ }
+
+ return res;
+}
+
+ui64 TDataShard::GetMaxObservedStep() const {
+ return Max(
+ Pipeline.GetLastPlannedTx().Step,
+ SnapshotManager.GetCompleteEdge().Step,
+ SnapshotManager.GetIncompleteEdge().Step,
+ SnapshotManager.GetUnprotectedReadEdge().Step,
+ MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0);
+}
+
+void TDataShard::SendImmediateWriteResult(
+ const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie)
+{
+ const ui64 step = version.Step;
+ const ui64 observedStep = GetMaxObservedStep();
+ if (step <= observedStep) {
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(version);
+ Send(target, event, 0, cookie);
+ return;
+ }
+
+ MediatorDelayedReplies.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(version),
+ std::forward_as_tuple(target, THolder<IEventBase>(event), cookie));
+
+ // Try to subscribe to the next step, when needed
+ if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
+ MediatorTimeCastWaitingSteps.insert(step);
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+}
+
+void TDataShard::SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie) {
+ if (IsFollower() || !ReadOnlyLeaseEnabled()) {
+ // We just send possibly stale result (old behavior)
+ Send(target, event, 0, cookie);
+ return;
+ }
+
+ struct TSendState : public TThrRefBase {
+ TActorId Target;
+ THolder<IEventBase> Event;
+ ui64 Cookie;
+
+ TSendState(const TActorId& target, IEventBase* event, ui64 cookie)
+ : Target(target)
+ , Event(event)
+ , Cookie(cookie)
+ { }
+ };
+
+ if (!readTime) {
+ readTime = AppData()->MonotonicTimeProvider->Now();
+ }
+
+ Executor()->ConfirmReadOnlyLease(readTime, [this, state = MakeIntrusive<TSendState>(target, event, cookie)] {
+ Send(state->Target, state->Event.Release(), 0, state->Cookie);
+ });
+}
+
+void TDataShard::SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie) {
+ SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie);
+}
+
+void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) {
+ for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
+ const ui64 step = it->first.Step;
+
+ if (step <= mediatorStep) {
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
+ Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
+ it = MediatorDelayedReplies.erase(it);
+ continue;
+ }
+
+ // Try to subscribe to the next step, when needed
+ if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
+ MediatorTimeCastWaitingSteps.insert(step);
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+ break;
+ }
+}
+
+void TDataShard::CheckMediatorStateRestored() {
+ if (!MediatorStateWaiting ||
+ !RegistrationSended ||
+ !MediatorTimeCastEntry ||
+ CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>())
+ {
+ // We are not waiting or not ready to make a decision
+ if (MediatorStateWaiting &&
+ MediatorTimeCastEntry &&
+ CoordinatorPrevReadStepMax == Max<ui64>() &&
+ !MediatorStateBackupInitiated)
+ {
+ // It is possible we don't have coordinators with new protocol support
+ // Use a backup plan of acquiring a read snapshot for restoring the read step
+ Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup);
+ MediatorStateBackupInitiated = true;
+ }
+ return;
+ }
+
+ // CoordinatorPrevReadStepMax shows us what is the next minimum step that
+ // may be acquired as a snapshot. This tells as that no previous read
+ // could have happened after this step, even if it has been acquired.
+ // CoordinatorPrevReadStepMin shows us the maximum step that could have
+ // been acquired before we subscribed. Even if the next step is very
+ // large it may be used to infer an erlier step, as previous generation
+ // could not have read any step that was not acquired.
+ // When some coordinators are still pending we use CoordinatorPrevReadStepMax
+ // as a worst case read step in the future, hoping to make a tighter
+ // prediction while we wait for that.
+ // Note we always need to wait for CoordinatorPrevReadStepMax because
+ // previous generation may have observed it and may have replied to
+ // immediate writes at that step.
+ const ui64 waitStep = CoordinatorPrevReadStepMax;
+ const ui64 readStep = CoordinatorSubscriptionsPending == 0
+ ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin)
+ : CoordinatorPrevReadStepMax;
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
+
+ // WARNING: we must perform this check BEFORE we update unprotected read edge
+ // We may enter this code path multiple times, and we expect that the above
+ // read step may be refined while we wait based on pessimistic backup step.
+ if (GetMaxObservedStep() < waitStep) {
+ // We need to wait until we observe mediator step that is at least
+ // as large as the step we found.
+ if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), waitStep));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << waitStep << " from mediator time cast");
+ }
+ return;
+ }
+
+ // Using the inferred last read step we restore the pessimistic unprotected
+ // read edge. Note we only need to do so if there have actually been any
+ // unprotected reads in this datashard history. We also need to make sure
+ // this edge is at least one smaller than ImmediateWriteEdge when we know
+ // we started unconfirmed immediate writes in the last generation.
+ if (SnapshotManager.GetPerformedUnprotectedReads()) {
+ const TRowVersion lastReadEdge(readStep, Max<ui64>());
+ const TRowVersion preImmediateWriteEdge =
+ SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
+ ? SnapshotManager.GetImmediateWriteEdge().Prev()
+ : TRowVersion::Min();
+ SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
+ }
+
+ // Promote the replied immediate write edge up to the currently observed step
+ // This is needed to make sure we read any potentially replied immediate
+ // writes before the restart, and conversely don't accidentally read any
+ // data that is definitely not replied yet.
+ if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
+ const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(
+ Min(edge, SnapshotManager.GetImmediateWriteEdge()));
+ }
+
+ MediatorStateWaiting = false;
+
+ // Resend all waiting messages
+ TVector<THolder<IEventHandle>> msgs;
+ msgs.swap(MediatorStateWaitingMsgs);
+ for (auto& ev : msgs) {
+ TActivationContext::Send(ev.Release());
+ }
+}
+
NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code) {
using EResult = NMiniKQL::IEngineFlat::EResult;
@@ -1410,7 +1732,7 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectReasons.push_back("decided to reject due to given RejectProbability");
}
- size_t totalInFly = (TxInFly() + ImmediateInFly() + ProposeQueue.Size() + TxWaiting());
+ size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting());
if (totalInFly > GetMaxTxInFly()) {
reject = true;
rejectReasons.push_back("MaxTxInFly was exceeded");
@@ -1481,10 +1803,21 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction*
}
void TDataShard::UpdateProposeQueueSize() const {
- SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
+ SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
}
void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
+ // Check if we need to delay an immediate transaction
+ if (MediatorStateWaiting &&
+ (ev->Get()->GetFlags() & TTxFlags::Immediate) &&
+ !(ev->Get()->GetFlags() & TTxFlags::ForceOnline))
+ {
+ // We cannot calculate correct version until we restore mediator state
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
if (Pipeline.HasProposeDelayers()) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored");
@@ -1897,7 +2230,30 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev,
MediatorTimeCastEntry = ev->Get()->Entry;
Y_VERIFY(MediatorTimeCastEntry);
+ SendAfterMediatorStepActivate(MediatorTimeCastEntry->Get(TabletID()));
+
Pipeline.ActivateWaitingTxOps(ctx);
+
+ CheckMediatorStateRestored();
+}
+
+void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx) {
+ auto* msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID()
+ << " coordinator " << msg->CoordinatorId
+ << " last step " << msg->LastReadStep
+ << " next step " << msg->ReadStep->Get());
+ auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId);
+ Y_VERIFY_S(it != CoordinatorSubscriptionById.end(),
+ "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId);
+ size_t index = it->second;
+ auto& subscription = CoordinatorSubscriptions.at(index);
+ subscription.ReadStep = msg->ReadStep;
+ CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep);
+ CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep);
+ --CoordinatorSubscriptionsPending;
+ CheckMediatorStateRestored();
}
void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx) {
@@ -1911,13 +2267,27 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const
for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end() && *it <= step;)
it = MediatorTimeCastWaitingSteps.erase(it);
+ SendAfterMediatorStepActivate(step);
+
Pipeline.ActivateWaitingTxOps(ctx);
+
+ CheckMediatorStateRestored();
+}
+
+void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) {
+ if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) {
+ // We are still waiting for new protol coordinator state
+ // TODO: send an old snapshot request to coordinators
+ }
}
bool TDataShard::WaitPlanStep(ui64 step) {
if (step <= Pipeline.GetLastPlannedTx().Step)
return false;
+ if (step <= SnapshotManager.GetCompleteEdge().Step)
+ return false;
+
if (MediatorTimeCastEntry && step <= MediatorTimeCastEntry->Get(TabletID()))
return false;
@@ -1943,7 +2313,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
auto &rec = ev->Get()->Record;
if (rec.HasMvccSnapshot()) {
TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
- TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge();
+ TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
if (rowVersion >= unreadableEdge) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge);
return true;
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 3b3c9d9d6f..c55cf209f5 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -132,6 +132,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c
}
void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx) {
+ if (MediatorStateWaiting) {
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) {
Executor()->Execute(new TTxUploadRows(this, ev), ctx);
} else {
@@ -140,6 +145,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
}
void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx) {
+ if (MediatorStateWaiting) {
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) {
Executor()->Execute(new TTxEraseRows(this, ev), ctx);
} else {
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index c5d2bf68c6..4128209133 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -209,6 +209,21 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
template <typename TEvRequest, typename TEvResponse>
+void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie) {
+ Y_VERIFY(Result);
+
+ if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) {
+ self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS);
+ } else {
+ self->IncCounter(COUNTER_BULK_UPSERT_ERROR);
+ }
+
+ target = Ev->Sender;
+ event = std::move(Result);
+ cookie = 0;
+}
+
+template <typename TEvRequest, typename TEvResponse>
void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) {
Y_VERIFY(Result);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h
index 262027b14f..928558de2d 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.h
+++ b/ydb/core/tx/datashard/datashard_common_upload.h
@@ -23,6 +23,7 @@ public:
protected:
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion);
+ void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
void SendResult(TDataShard* self, const TActorContext& ctx);
TVector<IChangeCollector::TChange> GetCollectedChanges() const;
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 826819596f..574c5ff647 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -204,7 +204,7 @@ bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc,
return true;
}
-void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) {
+TDirectTxResult TDirectTxErase::GetResult(TDataShard* self) {
Y_VERIFY(Result);
if (Result->Record.GetStatus() == NKikimrTxDataShard::TEvEraseRowsResponse::OK) {
@@ -213,7 +213,11 @@ void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) {
self->IncCounter(COUNTER_ERASE_ROWS_ERROR);
}
- ctx.Send(Ev->Sender, std::move(Result));
+ TDirectTxResult res;
+ res.Target = Ev->Sender;
+ res.Event = std::move(Result);
+ res.Cookie = 0;
+ return res;
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxErase::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h
index 689b868a36..7663b47d38 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.h
+++ b/ydb/core/tx/datashard/datashard_direct_erase.h
@@ -66,7 +66,7 @@ public:
NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error);
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
- void SendResult(TDataShard* self, const TActorContext& ctx) override;
+ TDirectTxResult GetResult(TDataShard* self) override;
TVector<IChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
index 145f0936cd..858247f36a 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
@@ -31,16 +31,25 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded)
}
bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) {
- auto [readVersion, writeVersion] = self->GetReadWriteVersions();
+ auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
if (!Impl->Execute(self, txc, readVersion, writeVersion))
return false;
- self->PromoteCompleteEdge(writeVersion.Step, txc);
+ if (self->IsMvccEnabled()) {
+ // Note: we always wait for completion, so we can ignore the result
+ self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ }
+
return true;
}
void TDirectTransaction::SendResult(TDataShard* self, const TActorContext& ctx) {
- Impl->SendResult(self, ctx);
+ auto result = Impl->GetResult(self);
+ if (MvccReadWriteVersion) {
+ self->SendImmediateWriteResult(*MvccReadWriteVersion, result.Target, result.Event.Release(), result.Cookie);
+ } else {
+ ctx.Send(result.Target, result.Event.Release(), 0, result.Cookie);
+ }
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTransaction::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h
index 5b42e1f1d7..e4d83188d5 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.h
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.h
@@ -11,11 +11,17 @@
namespace NKikimr {
namespace NDataShard {
+struct TDirectTxResult {
+ TActorId Target;
+ THolder<IEventBase> Event;
+ ui64 Cookie;
+};
+
class IDirectTx {
public:
virtual ~IDirectTx() = default;
virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0;
- virtual void SendResult(TDataShard* self, const TActorContext& ctx) = 0;
+ virtual TDirectTxResult GetResult(TDataShard* self) = 0;
virtual TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const = 0;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp
index 6173c0e218..abb3eda763 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp
@@ -12,8 +12,10 @@ bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const
return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion);
}
-void TDirectTxUpload::SendResult(TDataShard* self, const TActorContext& ctx) {
- TCommonUploadOps::SendResult(self, ctx);
+TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) {
+ TDirectTxResult res;
+ TCommonUploadOps::GetResult(self, res.Target, res.Event, res.Cookie);
+ return res;
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxUpload::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h
index 7ca84f19a7..a26226bd79 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.h
+++ b/ydb/core/tx/datashard/datashard_direct_upload.h
@@ -14,7 +14,7 @@ public:
explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
- void SendResult(TDataShard* self, const TActorContext& ctx) override;
+ TDirectTxResult GetResult(TDataShard* self) override;
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index b5a5abfa6f..e5934ac2be 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -295,6 +295,7 @@ class TDataShard
EvRequestChangeRecords,
EvRemoveChangeRecords,
EvReplicationSourceOffsets,
+ EvMediatorRestoreBackup,
EvEnd
};
@@ -434,6 +435,8 @@ class TDataShard
// Note that keys are NOT sorted in any way
THashMap<TString, TVector<TSplitKey>> SourceOffsets;
};
+
+ struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {};
};
struct Schema : NIceDb::Schema {
@@ -791,6 +794,10 @@ class TDataShard
Sys_NextChangeRecordOrder, // 36 Next order of change record
Sys_LastChangeRecordGroup, // 37 Last group number of change records
+ SysMvcc_UnprotectedReads, // 38 Shard may have performed unprotected mvcc reads when non-zero
+ SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled
+ SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled
+
// reserved
SysPipeline_Flags = 1000,
SysPipeline_LimitActiveTx,
@@ -800,6 +807,9 @@ class TDataShard
static_assert(ESysTableKeys::Sys_SubDomainOwnerId == 33, "Sys_SubDomainOwnerId changed its value");
static_assert(ESysTableKeys::Sys_SubDomainLocalPathId == 34, "Sys_SubDomainLocalPathId changed its value");
static_assert(ESysTableKeys::Sys_SubDomainOutOfSpace == 35, "Sys_SubDomainOutOfSpace changed its value");
+ static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value");
+ static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value");
+ static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value");
static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000
@@ -890,7 +900,9 @@ class TDataShard
void Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx);
void Handle(TEvDataShard::TEvReturnBorrowedPart::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvReturnBorrowedPartAck::TPtr& ev, const TActorContext& ctx);
@@ -1001,6 +1013,9 @@ class TDataShard
void OnStopGuardStarting(const TActorContext &ctx);
void OnStopGuardComplete(const TActorContext &ctx);
void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) override;
+ void IcbRegister();
+ bool ReadOnlyLeaseEnabled() override;
+ TDuration ReadOnlyLeaseDuration() override;
void OnActivateExecutor(const TActorContext &ctx) override;
void Cleanup(const TActorContext &ctx);
@@ -1214,6 +1229,16 @@ public:
return BackupReadAheadHi;
}
+ bool GetEnablePrioritizedMvccSnapshotReads() const {
+ ui64 value = EnablePrioritizedMvccSnapshotReads;
+ return value != 0;
+ }
+
+ bool GetEnableUnprotectedMvccSnapshotReads() const {
+ ui64 value = EnableUnprotectedMvccSnapshotReads;
+ return value != 0;
+ }
+
template <typename T>
void ReleaseCache(T& tx) {
ReleaseTxCache(tx.GetTxCacheUsage());
@@ -1400,7 +1425,28 @@ public:
// Returns a suitable row version for performing a transaction
TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const;
+ enum class EPromotePostExecuteEdges {
+ ReadOnly,
+ RepeatableRead,
+ ReadWrite,
+ };
+
+ struct TPromotePostExecuteEdges {
+ bool HadWrites = false;
+ bool WaitCompletion = false;
+ };
+
TReadWriteVersions GetReadWriteVersions(TOperation* op = nullptr) const;
+ TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(
+ const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc);
+ ui64 GetMaxObservedStep() const;
+ void SendImmediateWriteResult(
+ const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ void SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ void SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ void SendAfterMediatorStepActivate(ui64 mediatorStep);
+
+ void CheckMediatorStateRestored();
void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const;
@@ -1948,8 +1994,38 @@ private:
TS3UploadsManager S3Uploads;
TS3DownloadsManager S3Downloads;
+ struct TMediatorDelayedReply {
+ TActorId Target;
+ THolder<IEventBase> Event;
+ ui64 Cookie;
+
+ TMediatorDelayedReply(const TActorId& target, THolder<IEventBase> event, ui64 cookie)
+ : Target(target)
+ , Event(std::move(event))
+ , Cookie(cookie)
+ { }
+ };
+
TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
TSet<ui64> MediatorTimeCastWaitingSteps;
+ TMultiMap<TRowVersion, TMediatorDelayedReply> MediatorDelayedReplies;
+
+ struct TCoordinatorSubscription {
+ ui64 CoordinatorId;
+ TMediatorTimecastReadStep::TCPtr ReadStep;
+ };
+
+ TVector<TCoordinatorSubscription> CoordinatorSubscriptions;
+ THashMap<ui64, size_t> CoordinatorSubscriptionById;
+ size_t CoordinatorSubscriptionsPending = 0;
+ ui64 CoordinatorPrevReadStepMin = 0;
+ ui64 CoordinatorPrevReadStepMax = Max<ui64>();
+
+ TVector<THolder<IEventHandle>> MediatorStateWaitingMsgs;
+ bool MediatorStateWaiting = false;
+ bool MediatorStateBackupInitiated = false;
+
+ bool IcbRegistered = false;
TControlWrapper DisableByKeyFilter;
TControlWrapper MaxTxInFly;
@@ -1969,6 +2045,12 @@ private:
TControlWrapper BackupReadAheadLo;
TControlWrapper BackupReadAheadHi;
+ TControlWrapper EnablePrioritizedMvccSnapshotReads;
+ TControlWrapper EnableUnprotectedMvccSnapshotReads;
+
+ TControlWrapper EnableLeaderLeases;
+ TControlWrapper MinLeaderLeaseDurationUs;
+
// Set of InRS keys to remove from local DB.
THashSet<TReadSetKey> InRSToRemove;
TIntrusivePtr<TThrRefBase> DataShardSysTables;
@@ -2093,6 +2175,9 @@ protected:
TRACE_EVENT(NKikimrServices::TX_DATASHARD);
switch (ev->GetTypeRewrite()) {
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
+ HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
@@ -2142,7 +2227,9 @@ protected:
HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle);
HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle);
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle);
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
+ HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvDataShard::TEvCancelTransactionProposal, Handle);
HFuncTraced(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
HFunc(TEvDataShard::TEvReturnBorrowedPart, Handle);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 3498162c17..faf90e403b 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1581,19 +1581,21 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co
if (Self->MvccSwitchState == TSwitchState::SWITCHING) {
WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished
} else {
+ bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads();
Y_VERIFY_DEBUG(ev->Get()->Record.HasMvccSnapshot());
TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId());
WaitingDataTxOps.emplace(snapshot, std::move(ev));
+ const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1;
TRowVersion unreadableEdge;
- if (!Self->WaitPlanStep(snapshot.Step + 1) && snapshot < (unreadableEdge = GetUnreadableEdge())) {
- ActivateWaitingTxOps(unreadableEdge, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op
+ if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) {
+ ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op
}
}
return true;
}
-void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) {
+void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) {
if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING)
return;
@@ -1611,8 +1613,12 @@ void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx)
activated = true;
}
- if (minWait == TRowVersion::Max() || Self->WaitPlanStep(minWait.Step + 1) || minWait >= (edge = GetUnreadableEdge()))
+ if (minWait == TRowVersion::Max() ||
+ Self->WaitPlanStep(prioritizedReads ? minWait.Step : minWait.Step + 1) ||
+ minWait >= (edge = GetUnreadableEdge(prioritizedReads)))
+ {
break;
+ }
// Async MediatorTimeCastEntry update, need to rerun activation
}
@@ -1626,7 +1632,8 @@ void TPipeline::ActivateWaitingTxOps(const TActorContext& ctx) {
if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING)
return;
- ActivateWaitingTxOps(GetUnreadableEdge(), ctx);
+ bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads();
+ ActivateWaitingTxOps(GetUnreadableEdge(prioritizedReads), prioritizedReads, ctx);
}
TRowVersion TPipeline::GetReadEdge() const {
@@ -1645,29 +1652,51 @@ TRowVersion TPipeline::GetReadEdge() const {
return TRowVersion(step, Max<ui64>());
}
-TRowVersion TPipeline::GetUnreadableEdge() const {
- auto last = TRowVersion(
+TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const {
+ const auto last = TRowVersion(
GetLastActivePlannedOpStep(),
GetLastActivePlannedOpId());
auto it = Self->TransQueue.PlannedTxs.upper_bound(TStepOrder(last.Step, last.TxId));
while (it != Self->TransQueue.PlannedTxs.end()) {
- last = TRowVersion(it->Step, it->TxId);
- if (!Self->TransQueue.FindTxInFly(last.TxId)->IsReadOnly()) {
+ const auto next = TRowVersion(it->Step, it->TxId);
+ if (!Self->TransQueue.FindTxInFly(next.TxId)->IsReadOnly()) {
// If there's any non-read-only planned tx we don't have in the
// dependency tracker yet, we absolutely cannot read from that
// version.
- return last;
+ return next;
}
++it;
}
- // It looks like we have an empty plan queue (or it's read-only), so we
- // use a rough estimate of a point we would use for immediate writes in
- // the far future. That point in time is not complete yet and cannot be
- // used for snapshot reads.
- last = TRowVersion(LastPlannedTx.Step, LastPlannedTx.TxId);
- ui64 step = Max(Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, (++last).Step);
- return TRowVersion(step, Max<ui64>());
+ // It looks like we have an empty plan queue (or it's read-only), so we use
+ // a rough estimate of a point in time we would use for immediate writes
+ // in the distant future. That point in time possibly has some unfinished
+ // transactions, but they would be resolved using dependency tracker. Here
+ // we use an estimate of the observed mediator step (including possible past
+ // generations). Note that we also update CompleteEdge when the distributed
+ // queue is empty, but we have been performing immediate writes and thus
+ // observing an updated mediator timecast step.
+ const ui64 mediatorStep = Max(
+ Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0,
+ Self->SnapshotManager.GetIncompleteEdge().Step,
+ Self->SnapshotManager.GetCompleteEdge().Step,
+ LastPlannedTx.Step);
+
+ // Using an observed mediator step we conclude that we have observed all
+ // distributed transactions up to the end of that step.
+ const TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
+
+ if (prioritizeReads) {
+ // We are prioritizing reads, and we are ok with blocking immediate writes
+ // in the current step. So the first unreadable version is actually in
+ // the next step.
+ return mediatorEdge.Next();
+ } else {
+ // We cannot block immediate writes up to this edge, thus we actually
+ // need to wait until the edge progresses above this version. This
+ // would happen when mediator timecast moves to the next step.
+ return mediatorEdge;
+ }
}
void TPipeline::AddCompletingOp(const TOperation::TPtr& op) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 67c35260b2..2d0d76b100 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -329,11 +329,11 @@ public:
ui64 WaitingTxs() const { return WaitingDataTxOps.size(); }
bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
- void ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx);
+ void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx);
void ActivateWaitingTxOps(const TActorContext& ctx);
TRowVersion GetReadEdge() const;
- TRowVersion GetUnreadableEdge() const;
+ TRowVersion GetUnreadableEdge(bool prioritizedReads) const;
void AddCompletingOp(const TOperation::TPtr& op);
void RemoveCompletingOp(const TOperation::TPtr& op);
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index da1219bb9e..39426396f3 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -30,9 +30,11 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
TRowVersion completeEdge = TRowVersion::Min();
TRowVersion incompleteEdge = TRowVersion::Min();
TRowVersion lowWatermark = TRowVersion::Min();
+ TRowVersion immediateWriteEdge = TRowVersion::Min();
ui32 mvccState = 0;
ui64 keepSnapshotTimeout = 0;
+ ui64 unprotectedReads = 0;
TSnapshotMap snapshots;
@@ -43,12 +45,15 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
// We don't currently support mvcc on the follower
ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId);
}
{
@@ -86,9 +91,19 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
MinWriteVersion = minWriteVersion;
MvccState = static_cast<EMvccState>(mvccState);
KeepSnapshotTimeout = keepSnapshotTimeout;
+ PerformedUnprotectedReads = (unprotectedReads != 0);
CompleteEdge = completeEdge;
IncompleteEdge = incompleteEdge;
LowWatermark = lowWatermark;
+ ImmediateWriteEdge = immediateWriteEdge;
+ if (ImmediateWriteEdge.Step <= Max(CompleteEdge.Step, IncompleteEdge.Step)) {
+ ImmediateWriteEdgeReplied = immediateWriteEdge;
+ } else {
+ // We cannot be sure which writes we have replied to
+ // Datashard will restore mediator state and decide
+ ImmediateWriteEdgeReplied.Step = Max(CompleteEdge.Step, IncompleteEdge.Step);
+ ImmediateWriteEdgeReplied.TxId = Max<ui64>();
+ }
CommittedCompleteEdge = completeEdge;
Snapshots = std::move(snapshots);
}
@@ -208,6 +223,82 @@ bool TSnapshotManager::PromoteIncompleteEdge(TOperation* op, TTransactionContext
return false;
}
+TRowVersion TSnapshotManager::GetImmediateWriteEdge() const {
+ return ImmediateWriteEdge;
+}
+
+TRowVersion TSnapshotManager::GetImmediateWriteEdgeReplied() const {
+ return ImmediateWriteEdgeReplied;
+}
+
+void TSnapshotManager::SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) {
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeStep, version.Step);
+ Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, version.TxId);
+ ImmediateWriteEdge = version;
+}
+
+bool TSnapshotManager::PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) {
+ if (!IsMvccEnabled())
+ return false;
+
+ if (version > ImmediateWriteEdge) {
+ SetImmediateWriteEdge(version, txc);
+
+ return true;
+ }
+
+ return false;
+}
+
+bool TSnapshotManager::PromoteImmediateWriteEdgeReplied(const TRowVersion& version) {
+ if (!IsMvccEnabled())
+ return false;
+
+ if (version > ImmediateWriteEdgeReplied) {
+ ImmediateWriteEdgeReplied = version;
+ return true;
+ }
+
+ return false;
+}
+
+TRowVersion TSnapshotManager::GetUnprotectedReadEdge() const {
+ return UnprotectedReadEdge;
+}
+
+bool TSnapshotManager::PromoteUnprotectedReadEdge(const TRowVersion& version) {
+ if (IsMvccEnabled() && UnprotectedReadEdge < version) {
+ UnprotectedReadEdge = version;
+ return true;
+ }
+
+ return false;
+}
+
+bool TSnapshotManager::GetPerformedUnprotectedReads() const {
+ return PerformedUnprotectedReads;
+}
+
+bool TSnapshotManager::IsPerformedUnprotectedReadsCommitted() const {
+ return PerformedUnprotectedReadsUncommitted != 0;
+}
+
+void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc) {
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_UnprotectedReads, ui64(performedUnprotectedReads ? 1 : 0));
+ PerformedUnprotectedReads = performedUnprotectedReads;
+ PerformedUnprotectedReadsUncommitted++;
+
+ txc.OnCommitted([this] {
+ this->PerformedUnprotectedReadsUncommitted--;
+ });
+}
+
void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) {
using Schema = TDataShard::Schema;
@@ -275,7 +366,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
const TRowVersion opVersion(step, txId);
// We need to choose a version that is at least as large as all previous edges
- TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge);
+ TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge, ImmediateWriteEdge);
// This must be a version that we may have previously written to, and which
// must not be a snapshot. We don't know if there have been any immediate
@@ -300,7 +391,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
SetCompleteEdge(nicedb, nextVersion);
SetIncompleteEdge(nicedb, nextVersion);
+ SetImmediateWriteEdge(nextVersion, txc);
SetLowWatermark(nicedb, nextVersion);
+ ImmediateWriteEdgeReplied = ImmediateWriteEdge;
break;
}
@@ -316,7 +409,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
const auto minVersion = TRowVersion::Min();
SetCompleteEdge(nicedb, minVersion);
SetIncompleteEdge(nicedb, minVersion);
+ SetImmediateWriteEdge(minVersion, txc);
SetLowWatermark(nicedb, minVersion);
+ ImmediateWriteEdgeReplied = ImmediateWriteEdge;
break;
}
diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h
index 8472c5e274..c6a1c8d1f3 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.h
+++ b/ydb/core/tx/datashard/datashard_snapshots.h
@@ -172,6 +172,19 @@ public:
bool PromoteIncompleteEdge(TOperation* op, TTransactionContext& txc);
+ TRowVersion GetImmediateWriteEdge() const;
+ TRowVersion GetImmediateWriteEdgeReplied() const;
+ void SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc);
+ bool PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc);
+ bool PromoteImmediateWriteEdgeReplied(const TRowVersion& version);
+
+ TRowVersion GetUnprotectedReadEdge() const;
+ bool PromoteUnprotectedReadEdge(const TRowVersion& version);
+
+ bool GetPerformedUnprotectedReads() const;
+ bool IsPerformedUnprotectedReadsCommitted() const;
+ void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc);
+
EMvccState GetMvccState() const {
return MvccState;
}
@@ -244,9 +257,14 @@ private:
EMvccState MvccState = EMvccState::MvccUnspecified;
ui64 KeepSnapshotTimeout = 0;
+ bool PerformedUnprotectedReads = false;
+ ui64 PerformedUnprotectedReadsUncommitted = 0;
TRowVersion IncompleteEdge = TRowVersion::Min();
TRowVersion CompleteEdge = TRowVersion::Min();
TRowVersion LowWatermark = TRowVersion::Min();
+ TRowVersion ImmediateWriteEdge = TRowVersion::Min();
+ TRowVersion ImmediateWriteEdgeReplied = TRowVersion::Min();
+ TRowVersion UnprotectedReadEdge = TRowVersion::Min();
TRowVersion CommittedCompleteEdge = TRowVersion::Min();
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index 4a9ab00878..ce6a08be45 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -282,6 +282,9 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_UNSAFE_UPLOAD_ROWS; }
+
+private:
+ TRowVersion MvccVersion = TRowVersion::Min();
};
class TDataShard::TTxExecuteMvccStateChange: public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
index 273952a06d..cb085ad480 100644
--- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
@@ -14,12 +14,26 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA
if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion))
return false;
- Self->PromoteCompleteEdge(writeVersion.Step, txc);
+ if (Self->IsMvccEnabled()) {
+ // Note: we always wait for completion, so we can ignore the result
+ Self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ MvccVersion = writeVersion;
+ }
+
return true;
}
void TDataShard::TTxUnsafeUploadRows::Complete(const TActorContext& ctx) {
- TCommonUploadOps::SendResult(Self, ctx);
+ TActorId target;
+ THolder<IEventBase> event;
+ ui64 cookie;
+ TCommonUploadOps::GetResult(Self, target, event, cookie);
+
+ if (MvccVersion) {
+ Self->SendImmediateWriteResult(MvccVersion, target, event.Release(), cookie);
+ } else {
+ ctx.Send(target, event.Release(), 0, cookie);
+ }
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index d2407cbf60..33197f23be 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1748,6 +1748,10 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId) {
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration) {
auto &runtime = *server->GetRuntime();
+ SimulateSleep(runtime, duration);
+}
+
+void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) {
auto sender = runtime.AllocateEdgeActor();
runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration);
runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 9ee4a455a8..e72cf689fe 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -610,6 +610,7 @@ void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId)
void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId);
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration);
+void SimulateSleep(TTestActorRuntime& runtime, TDuration duration);
void SendSQL(Tests::TServer::TPtr server,
TActorId sender,
@@ -635,4 +636,21 @@ struct IsTxResultComplete {
void WaitTabletBecomesOffline(Tests::TServer::TPtr server, ui64 tabletId);
+///
+class TDisableDataShardLogBatching : public TNonCopyable {
+public:
+ TDisableDataShardLogBatching()
+ : PrevValue(NDataShard::gAllowLogBatchingDefaultValue)
+ {
+ NDataShard::gAllowLogBatchingDefaultValue = false;
+ }
+
+ ~TDisableDataShardLogBatching() {
+ NDataShard::gAllowLogBatchingDefaultValue = PrevValue;
+ }
+
+private:
+ const bool PrevValue;
+};
+
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index b8a7873652..e5c976dd35 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -124,6 +124,14 @@ namespace NKqpHelpers {
return ev->Get()->Record.GetResponse().GetSessionId();
}
+ inline void CloseSession(TTestActorRuntime& runtime, TActorId sender, const TString& sessionId) {
+ auto request = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ request->Record.MutableRequest()->SetSessionId(sessionId);
+ runtime.Send(
+ new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()),
+ 0, /* via actor system */ true);
+ }
+
inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
const TActorId sender,
const TString& sql,
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index f2b8c97cae..edde89e3c5 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -4595,6 +4595,541 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterStuckRW, UseNewEngine) {
}
}
+Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(UnprotectedReads ? 1 : 0);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ auto table1shards = GetTableShards(server, sender, "/Root/table-1");
+ auto table2shards = GetTableShards(server, sender, "/Root/table-2");
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ // Perform an immediate write
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ // Perform an immediate read, we should observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Send an immediate write transaction, but don't wait for result
+ auto senderImmediateWrite = runtime.AllocateEdgeActor();
+ SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5)
+ )")));
+
+ // We sleep for very little so datashard commits changes, but doesn't advance
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Perform an immediate read again, it should NOT observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Wait for the write to finish
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // Perform an immediate read again, it should observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Start a new write and sleep again
+ SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7)
+ )")));
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Verify this write is not observed yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Reboot the tablet
+ RebootTablet(runtime, table1shards.at(0), sender);
+
+ // Verify the write above cannot be observed after restart as well
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Send one more write and sleep again
+ auto senderImmediateWrite2 = runtime.AllocateEdgeActor();
+ SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9)
+ )")));
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Verify it is also hidden at the moment
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Wait for result of the second write
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // We should finally observe both writes
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ TString snapshotSessionId, snapshotTxId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Reboot the tablet
+ RebootTablet(runtime, table1shards.at(0), sender);
+
+ // Upsert new data after reboot
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (11, 11)"));
+
+ // Make sure datashard state is restored correctly and snapshot is not corrupted
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ // Make sure new snapshot will actually observe new data
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "List { Struct { Optional { Uint32: 11 } } Struct { Optional { Uint32: 11 } } } "
+ "} Struct { Bool: false }");
+}
+
+Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(2)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain);
+
+ struct TNodeState {
+ // mediator -> bucket -> [observed, passed] step
+ THashMap<ui64, THashMap<ui32, std::pair<ui64, ui64>>> Steps;
+ ui64 AllowedStep = 0;
+ };
+ THashMap<ui32, TNodeState> mediatorState;
+
+ bool mustWaitForSteps[2] = { false, false };
+
+ auto captureTimecast = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ const ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ const ui32 nodeIndex = nodeId - runtime.GetNodeId(0);
+ switch (ev->GetTypeRewrite()) {
+ case TEvMediatorTimecast::TEvUpdate::EventType: {
+ auto* msg = ev->Get<TEvMediatorTimecast::TEvUpdate>();
+ const ui64 mediatorId = msg->Record.GetMediator();
+ const ui32 bucket = msg->Record.GetBucket();
+ ui64 step = msg->Record.GetTimeBarrier();
+ auto& state = mediatorState[nodeId];
+ if (!mustWaitForSteps[nodeIndex]) {
+ // Automatically allow all new steps
+ state.AllowedStep = Max(state.AllowedStep, step);
+ }
+ Cerr << "... node " << nodeId << " observed update from " << mediatorId
+ << " for bucket " << bucket
+ << " to step " << step
+ << " (allowed " << state.AllowedStep << ")"
+ << Endl;
+ auto& [observedStep, passedStep] = state.Steps[mediatorId][bucket];
+ observedStep = Max(observedStep, step);
+ if (step >= passedStep) {
+ if (step < state.AllowedStep) {
+ step = state.AllowedStep;
+ msg->Record.SetTimeBarrier(step);
+ Cerr << "... shifted to allowed step " << step << Endl;
+ }
+ passedStep = step;
+ break;
+ }
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ case TEvMediatorTimecast::TEvWaitPlanStep::EventType: {
+ const auto* msg = ev->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
+ const ui64 tabletId = msg->TabletId;
+ const ui64 step = msg->PlanStep;
+ Cerr << "... node " << nodeId << " observed wait by " << tabletId
+ << " for step " << step
+ << Endl;
+ auto& state = mediatorState[nodeId];
+ if (state.AllowedStep < step) {
+ state.AllowedStep = step;
+ for (auto& kv1 : state.Steps) {
+ const ui64 mediatorId = kv1.first;
+ for (auto& kv2 : kv1.second) {
+ const ui32 bucket = kv2.first;
+ auto& [observedStep, passedStep] = kv2.second;
+ if (passedStep < step && passedStep < observedStep) {
+ passedStep = Min(step, observedStep);
+ auto* update = new TEvMediatorTimecast::TEvUpdate();
+ update->Record.SetMediator(mediatorId);
+ update->Record.SetBucket(bucket);
+ update->Record.SetTimeBarrier(passedStep);
+ runtime.Send(new IEventHandle(ev->GetRecipientRewrite(), ev->GetRecipientRewrite(), update), nodeIndex, /* viaActorSystem */ true);
+ }
+ }
+ }
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureTimecast);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ auto table1shards = GetTableShards(server, sender, "/Root/table-1");
+
+ // Make sure tablet is at node 1
+ runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0)));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender);
+ UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
+ }
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ // Perform an immediate read, we should observe the initial write
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Stop updating mediator timecast on the second node
+ mustWaitForSteps[1] = true;
+
+ // Insert a new row and wait for result
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
+
+ // Make sure tablet is at node 2
+ runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(1)));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender);
+ UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
+ }
+
+ // Perform an immediate read, we should observe confirmed writes after restart
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+
+ // Previous snapshot must see original data
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // However new snapshots must see updated data
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+}
+
} // Y_UNIT_TEST_SUITE(DataShardOutOfOrder)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index b32f635cc0..105062ee06 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -23,7 +23,6 @@ public:
private:
void ExecuteDataTx(TOperation::TPtr op,
- TTransactionContext& txc,
const TActorContext& ctx);
void AddLocksToResult(TOperation::TPtr op);
};
@@ -124,7 +123,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
try {
try {
- ExecuteDataTx(op, txc, ctx);
+ ExecuteDataTx(op, ctx);
} catch (const TNotReadyTabletException&) {
// We want to try pinning (actually precharging) all required pages
// before restarting the transaction, to minimize future restarts.
@@ -169,7 +168,6 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
- TTransactionContext& txc,
const TActorContext& ctx) {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
IEngineFlat* engine = tx->GetDataTx()->GetEngine();
@@ -237,9 +235,6 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
} else {
result->SetTxResult(engine->GetShardReply(DataShard.TabletID()));
- if (op->IsImmediate() && !op->IsReadOnly())
- DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
-
if (auto changes = tx->GetDataTx()->GetCollectedChanges()) {
op->ChangeRecords().reserve(changes.size());
for (const auto& change : changes) {
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index ed5d912a41..1fd5f15f01 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -183,10 +183,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
AddLocksToResult(op, ctx);
- if (op->IsImmediate() && !op->IsReadOnly()) {
- DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
- }
-
if (auto changes = dataTx->GetCollectedChanges()) {
op->ChangeRecords().reserve(changes.size());
for (const auto& change : changes) {
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index de0e9e4ad2..d468922853 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -20,6 +20,7 @@ public:
const TActorContext &ctx) override;
private:
+ TDataShard::TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(TOperation* op, TTransactionContext& txc);
void CompleteRequest(TOperation::TPtr op,
const TActorContext &ctx);
void AddDiagnosticsResult(TOutputOpData::TResultPtr &res);
@@ -43,6 +44,27 @@ bool TFinishProposeUnit::IsReadyToExecute(TOperation::TPtr) const
return true;
}
+TDataShard::TPromotePostExecuteEdges TFinishProposeUnit::PromoteImmediatePostExecuteEdges(
+ TOperation* op,
+ TTransactionContext& txc)
+{
+ if (op->IsMvccSnapshotRead()) {
+ if (op->IsMvccSnapshotRepeatable()) {
+ return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::RepeatableRead, txc);
+ } else {
+ return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
+ }
+ } else if (op->MvccReadWriteVersion) {
+ if (op->IsReadOnly()) {
+ return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
+ } else {
+ return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ }
+ } else {
+ return { };
+ }
+}
+
EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
TTransactionContext &txc,
const TActorContext &ctx)
@@ -53,24 +75,18 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
bool hadWrites = false;
// When mvcc is enabled we perform marking after transaction is executed
- if (DataShard.IsMvccEnabled() && op->IsImmediate()) {
- if (op->IsMvccSnapshotRead()) {
- hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(op->GetMvccSnapshot(), txc);
- if (op->IsMvccSnapshotRepeatable()) {
- hadWrites |= DataShard.PromoteCompleteEdge(op.Get(), txc);
- if (!hadWrites && DataShard.GetSnapshotManager().GetCommittedCompleteEdge() < op->GetMvccSnapshot()) {
- // We need to wait for completion because some other transaction
- // has moved complete edge, but it's not committed yet.
- op->SetWaitCompletionFlag(true);
- }
- }
- } else if (op->MvccReadWriteVersion) {
- hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(*op->MvccReadWriteVersion, txc);
+ if (op->IsAborted()) {
+ // Make sure we confirm aborts with a commit
+ op->SetWaitCompletionFlag(true);
+ } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) {
+ auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc);
+
+ if (res.HadWrites) {
+ hadWrites = true;
+ res.WaitCompletion = true;
}
- if (hadWrites) {
- // FIXME: even if transaction itself didn't promote, we may still need to
- // wait for completion, when current in-memory state is not actually committed
+ if (res.WaitCompletion) {
op->SetWaitCompletionFlag(true);
}
}
@@ -159,8 +175,16 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size());
- if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId()))
- ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie());
+ if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
+ if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) {
+ DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie());
+ } else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) {
+ // TODO: we should actually measure a read timestamp and use it here
+ DataShard.SendImmediateReadResult(op->GetTarget(), res.Release(), op->GetCookie());
+ } else {
+ ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie());
+ }
+ }
}
void TFinishProposeUnit::AddDiagnosticsResult(TOutputOpData::TResultPtr &res)
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp
index 851ff5424b..7dc3a46420 100644
--- a/ydb/core/tx/time_cast/time_cast.cpp
+++ b/ydb/core/tx/time_cast/time_cast.cpp
@@ -15,6 +15,9 @@
namespace NKikimr {
+// We will unsubscribe from idle coordinators after 5 minutes
+static constexpr TDuration MaxIdleCoordinatorSubscriptionTime = TDuration::Minutes(5);
+
ui64 TMediatorTimecastEntry::Get(ui64 tabletId) const {
Y_UNUSED(tabletId);
return AtomicGet(Step);
@@ -72,9 +75,29 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
ui32 RefCount = 0;
};
+ struct TCoordinatorSubscriber {
+ TSet<ui64> Coordinators;
+ };
+
+ struct TCoordinator {
+ TMediatorTimecastReadStep::TPtr ReadStep = new TMediatorTimecastReadStep;
+ TActorId PipeClient;
+ ui64 LastSentSeqNo = 0;
+ ui64 LastConfirmedSeqNo = 0;
+ ui64 LastObservedReadStep = 0;
+ THashSet<TActorId> Subscribers;
+ TMap<std::pair<ui64, TActorId>, ui64> SubscribeRequests; // (seqno, subscriber) -> Cookie
+ TMap<std::pair<ui64, TActorId>, ui64> WaitRequests; // (step, subscriber) -> Cookie
+ TMonotonic IdleStart;
+ };
+
THashMap<ui64, TMediator> Mediators; // mediator tablet -> info
THashMap<ui64, TTabletInfo> Tablets;
+ ui64 LastSeqNo = 0;
+ THashMap<ui64, TCoordinator> Coordinators;
+ THashMap<TActorId, TCoordinatorSubscriber> CoordinatorSubscribers;
+
TMediator& MediatorInfo(ui64 mediator, const NKikimrSubDomains::TProcessingParams &processing) {
auto pr = Mediators.try_emplace(mediator, processing.GetTimeCastBucketsPerMediator());
if (!pr.second) {
@@ -118,19 +141,24 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
}
void TryResync(const TActorId &pipeClient, ui64 tabletId, const TActorContext &ctx) {
- for (auto &xpair : Mediators) {
- const ui64 mediatorTabletId = xpair.first;
- TMediator &mediator = xpair.second;
-
- if (mediator.PipeClient == pipeClient) {
- Y_VERIFY(tabletId == mediatorTabletId);
- mediator.PipeClient = TActorId();
- RegisterMediator(mediatorTabletId, mediator, ctx);
- return;
- }
+ ResyncCoordinator(tabletId, pipeClient, ctx);
+
+ auto it = Mediators.find(tabletId);
+ if (it == Mediators.end()) {
+ return;
+ }
+
+ TMediator &mediator = it->second;
+ if (mediator.PipeClient == pipeClient) {
+ mediator.PipeClient = TActorId();
+ RegisterMediator(tabletId, mediator, ctx);
}
}
+ void SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
+ void ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx);
+ void NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
+
void Handle(TEvMediatorTimecast::TEvRegisterTablet::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvUnregisterTablet::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &ev, const TActorContext &ctx);
@@ -138,6 +166,15 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx);
+ // Client requests for readstep subscriptions
+ void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx);
+
+ // Coordinator replies for readstep subscriptions
+ void Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_MEDIATOR_TIMECAST_ACTOR;
@@ -154,6 +191,13 @@ public:
HFunc(TEvMediatorTimecast::TEvWaitPlanStep, Handle);
HFunc(TEvMediatorTimecast::TEvUpdate, Handle);
+ HFunc(TEvMediatorTimecast::TEvSubscribeReadStep, Handle);
+ HFunc(TEvMediatorTimecast::TEvUnsubscribeReadStep, Handle);
+ HFunc(TEvMediatorTimecast::TEvWaitReadStep, Handle);
+
+ HFunc(TEvTxProxy::TEvSubscribeReadStepResult, Handle);
+ HFunc(TEvTxProxy::TEvSubscribeReadStepUpdate, Handle);
+
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
}
@@ -280,13 +324,13 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
default: {
const ui64 step = record.GetTimeBarrier();
bucket.Entry->Update(step, nullptr, 0);
- THashSet<ui64> processed; // a set of processed tablets
+ THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
while (!bucket.Waiters.empty()) {
const auto& top = bucket.Waiters.top();
if (step < top.PlanStep) {
break;
}
- if (processed.insert(top.TabletId).second) {
+ if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
}
bucket.Waiters.pop();
@@ -297,6 +341,224 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
}
}
+void TMediatorTimecastProxy::SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {
+ const ui64 seqNo = LastSeqNo;
+
+ if (!coordinator.PipeClient) {
+ ui64 maxDelay = 100 + TAppData::RandomProvider->GenRand64() % 50;
+ auto retryPolicy = NTabletPipe::TClientRetryPolicy{
+ .RetryLimitCount = 6 /* delays: 0, 10, 20, 40, 80, 100-150 */,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::MilliSeconds(maxDelay),
+ };
+ coordinator.PipeClient = ctx.RegisterWithSameMailbox(
+ NTabletPipe::CreateClient(ctx.SelfID, coordinatorId, retryPolicy));
+ }
+
+ coordinator.LastSentSeqNo = seqNo;
+ NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvSubscribeReadStep(coordinatorId, seqNo));
+}
+
+void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx) {
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+
+ auto &coordinator = itCoordinator->second;
+ if (coordinator.PipeClient != pipeClient) {
+ return;
+ }
+
+ coordinator.PipeClient = TActorId();
+ if (coordinator.Subscribers.empty()) {
+ // Just forget disconnected idle coordinators
+ Coordinators.erase(itCoordinator);
+ return;
+ }
+
+ ++LastSeqNo;
+ SyncCoordinator(coordinatorId, coordinator, ctx);
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ const ui64 coordinatorId = msg->CoordinatorId;
+ auto &subscriber = CoordinatorSubscribers[ev->Sender];
+ auto &coordinator = Coordinators[coordinatorId];
+ subscriber.Coordinators.insert(coordinatorId);
+ coordinator.Subscribers.insert(ev->Sender);
+ ui64 seqNo = ++LastSeqNo;
+ auto key = std::make_pair(seqNo, ev->Sender);
+ coordinator.SubscribeRequests[key] = ev->Cookie;
+ SyncCoordinator(coordinatorId, coordinator, ctx);
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ auto &subscriber = CoordinatorSubscribers[ev->Sender];
+ if (msg->CoordinatorId == 0) {
+ // Unsubscribe from all coordinators
+ for (ui64 coordinatorId : subscriber.Coordinators) {
+ auto &coordinator = Coordinators[coordinatorId];
+ coordinator.Subscribers.erase(ev->Sender);
+ if (coordinator.Subscribers.empty()) {
+ coordinator.IdleStart = ctx.Monotonic();
+ }
+ }
+ subscriber.Coordinators.clear();
+ } else if (subscriber.Coordinators.contains(msg->CoordinatorId)) {
+ // Unsubscribe from specific coordinator
+ auto &coordinator = Coordinators[msg->CoordinatorId];
+ coordinator.Subscribers.erase(ev->Sender);
+ if (coordinator.Subscribers.empty()) {
+ coordinator.IdleStart = ctx.Monotonic();
+ }
+ subscriber.Coordinators.erase(msg->CoordinatorId);
+ }
+
+ if (subscriber.Coordinators.empty()) {
+ // Don't track unnecessary subscribers
+ CoordinatorSubscribers.erase(ev->Sender);
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ const ui64 coordinatorId = msg->CoordinatorId;
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ if (!coordinator.Subscribers.contains(ev->Sender)) {
+ return;
+ }
+
+ const ui64 step = coordinator.LastObservedReadStep;
+ const ui64 waitReadStep = msg->ReadStep;
+ if (waitReadStep <= step) {
+ ctx.Send(ev->Sender,
+ new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step),
+ 0, ev->Cookie);
+ return;
+ }
+
+ auto key = std::make_pair(waitReadStep, ev->Sender);
+ coordinator.WaitRequests[key] = ev->Cookie;
+}
+
+void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx) {
+ const auto &record = ev->Get()->Record;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE TEvSubscribeReadStepResult " << record.ShortDebugString());
+
+ const ui64 coordinatorId = record.GetCoordinatorID();
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ bool updated = false;
+ const ui64 nextReadStep = record.GetNextAcquireStep();
+ if (coordinator.LastObservedReadStep < nextReadStep) {
+ coordinator.LastObservedReadStep = nextReadStep;
+ coordinator.ReadStep->Update(nextReadStep);
+ updated = true;
+ }
+
+ const ui64 seqNo = record.GetSeqNo();
+ if (coordinator.LastConfirmedSeqNo < seqNo) {
+ coordinator.LastConfirmedSeqNo = seqNo;
+
+ const ui64 lastReadStep = record.GetLastAcquireStep();
+ for (auto it = coordinator.SubscribeRequests.begin(); it != coordinator.SubscribeRequests.end();) {
+ const ui64 waitSeqNo = it->first.first;
+ if (seqNo < waitSeqNo) {
+ break;
+ }
+ const TActorId subscriberId = it->first.second;
+ const ui64 cookie = it->second;
+ if (coordinator.Subscribers.contains(subscriberId)) {
+ ctx.Send(subscriberId,
+ new TEvMediatorTimecast::TEvSubscribeReadStepResult(
+ coordinatorId,
+ lastReadStep,
+ nextReadStep,
+ coordinator.ReadStep),
+ 0, cookie);
+ }
+ it = coordinator.SubscribeRequests.erase(it);
+ }
+ }
+
+ if (updated) {
+ NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx);
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx) {
+ const auto &record = ev->Get()->Record;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE TEvSubscribeReadStepUpdate " << record.ShortDebugString());
+
+ const ui64 coordinatorId = record.GetCoordinatorID();
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ const ui64 nextReadStep = record.GetNextAcquireStep();
+ if (coordinator.LastObservedReadStep < nextReadStep) {
+ coordinator.LastObservedReadStep = nextReadStep;
+ coordinator.ReadStep->Update(nextReadStep);
+
+ NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx);
+ }
+
+ // Unsubscribe from idle coordinators
+ if (coordinator.Subscribers.empty() && (ctx.Monotonic() - coordinator.IdleStart) >= MaxIdleCoordinatorSubscriptionTime) {
+ if (coordinator.PipeClient) {
+ NTabletPipe::CloseClient(ctx, coordinator.PipeClient);
+ coordinator.PipeClient = TActorId();
+ }
+
+ Coordinators.erase(itCoordinator);
+ }
+}
+
+void TMediatorTimecastProxy::NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {
+ const ui64 step = coordinator.LastObservedReadStep;
+ for (auto it = coordinator.WaitRequests.begin(); it != coordinator.WaitRequests.end();) {
+ const ui64 waitStep = it->first.first;
+ if (step < waitStep) {
+ break;
+ }
+ const TActorId subscriberId = it->first.second;
+ const ui64 cookie = it->second;
+ if (coordinator.Subscribers.contains(subscriberId)) {
+ ctx.Send(subscriberId,
+ new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step),
+ 0, cookie);
+ }
+ it = coordinator.WaitRequests.erase(it);
+ }
+}
+
+
+
IActor* CreateMediatorTimecastProxy() {
return new TMediatorTimecastProxy();
}
diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h
index 1202d5a799..0765eae9c4 100644
--- a/ydb/core/tx/time_cast/time_cast.h
+++ b/ydb/core/tx/time_cast/time_cast.h
@@ -21,15 +21,41 @@ public:
void Update(ui64 step, ui64 *exemption, ui64 exsz);
};
+class TMediatorTimecastReadStep : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TMediatorTimecastReadStep>;
+ using TCPtr = TIntrusiveConstPtr<TMediatorTimecastReadStep>;
+
+ TMediatorTimecastReadStep(ui64 nextReadStep = 0)
+ : NextReadStep{ nextReadStep }
+ { }
+
+ ui64 Get() const {
+ return NextReadStep.load();
+ }
+
+ void Update(ui64 nextReadStep) {
+ NextReadStep.store(nextReadStep);
+ }
+
+private:
+ std::atomic<ui64> NextReadStep;
+};
+
struct TEvMediatorTimecast {
enum EEv {
// local part
EvRegisterTablet = EventSpaceBegin(TKikimrEvents::ES_TX_MEDIATORTIMECAST),
EvUnregisterTablet,
EvWaitPlanStep,
+ EvSubscribeReadStep,
+ EvUnsubscribeReadStep,
+ EvWaitReadStep,
EvRegisterTabletResult = EvRegisterTablet + 1 * 512,
EvNotifyPlanStep,
+ EvSubscribeReadStepResult,
+ EvNotifyReadStep,
// mediator part
EvWatch = EvRegisterTablet + 2 * 512,
@@ -138,6 +164,104 @@ struct TEvMediatorTimecast {
}
};
+ struct TEvSubscribeReadStep : public TEventLocal<TEvSubscribeReadStep, EvSubscribeReadStep> {
+ const ui64 CoordinatorId;
+
+ explicit TEvSubscribeReadStep(ui64 coordinatorId)
+ : CoordinatorId(coordinatorId)
+ {
+ Y_VERIFY(coordinatorId != 0);
+ }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " }";
+ }
+ };
+
+ struct TEvUnsubscribeReadStep : public TEventLocal<TEvUnsubscribeReadStep, EvUnsubscribeReadStep> {
+ const ui64 CoordinatorId;
+
+ explicit TEvUnsubscribeReadStep(ui64 coordinatorId = 0)
+ : CoordinatorId(coordinatorId)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " }";
+ }
+ };
+
+ struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> {
+ const ui64 CoordinatorId;
+ const ui64 LastReadStep;
+ const ui64 NextReadStep;
+ const TMediatorTimecastReadStep::TCPtr ReadStep;
+
+ TEvSubscribeReadStepResult(
+ ui64 coordinatorId,
+ ui64 lastReadStep,
+ ui64 nextReadStep,
+ TMediatorTimecastReadStep::TCPtr readStep)
+ : CoordinatorId(coordinatorId)
+ , LastReadStep(lastReadStep)
+ , NextReadStep(nextReadStep)
+ , ReadStep(std::move(readStep))
+ {
+ Y_VERIFY(ReadStep);
+ }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " LastReadStep# " << LastReadStep
+ << " NextReadStep# " << NextReadStep
+ << " ReadStep# " << ReadStep->Get()
+ << " }";
+ }
+ };
+
+ struct TEvWaitReadStep : public TEventLocal<TEvWaitReadStep, EvWaitReadStep> {
+ const ui64 CoordinatorId;
+ const ui64 ReadStep;
+
+ TEvWaitReadStep(ui64 coordinatorId, ui64 readStep)
+ : CoordinatorId(coordinatorId)
+ , ReadStep(readStep)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " ReadStep# " << ReadStep
+ << " }";
+ }
+ };
+
+ struct TEvNotifyReadStep : public TEventLocal<TEvNotifyReadStep, EvNotifyReadStep> {
+ const ui64 CoordinatorId;
+ const ui64 ReadStep;
+
+ TEvNotifyReadStep(ui64 coordinatorId, ui64 readStep)
+ : CoordinatorId(coordinatorId)
+ , ReadStep(readStep)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " ReadStep# " << ReadStep
+ << " }";
+ }
+ };
+
struct TEvWatch : public TEventPB<TEvWatch, NKikimrTxMediatorTimecast::TEvWatch, EvWatch> {
TEvWatch()
{}
diff --git a/ydb/core/tx/time_cast/time_cast_ut.cpp b/ydb/core/tx/time_cast/time_cast_ut.cpp
new file mode 100644
index 0000000000..0acb2ee128
--- /dev/null
+++ b/ydb/core/tx/time_cast/time_cast_ut.cpp
@@ -0,0 +1,84 @@
+#include "time_cast.h"
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/testlib/test_client.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NMediatorTimeCastTest {
+
+ using namespace Tests;
+
+ Y_UNIT_TEST_SUITE(MediatorTimeCast) {
+
+ void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) {
+ auto sender = runtime.AllocateEdgeActor();
+ runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration);
+ runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
+ }
+
+ void SendSubscribeRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 cookie = 0) {
+ auto request = MakeHolder<TEvMediatorTimecast::TEvSubscribeReadStep>(coordinatorId);
+ runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true);
+ }
+
+ TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr WaitSubscribeResult(TTestActorRuntime& runtime, const TActorId& sender) {
+ return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvSubscribeReadStepResult>(sender);
+ }
+
+ void SendWaitRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 readStep, ui64 cookie = 0) {
+ auto request = MakeHolder<TEvMediatorTimecast::TEvWaitReadStep>(coordinatorId, readStep);
+ runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true);
+ }
+
+ TEvMediatorTimecast::TEvNotifyReadStep::TPtr WaitNotifyResult(TTestActorRuntime& runtime, const TActorId& sender) {
+ return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvNotifyReadStep>(sender);
+ }
+
+ Y_UNIT_TEST(ReadStepSubscribe) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+
+ auto &runtime = *server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NActors::NLog::PRI_DEBUG);
+
+ auto sender = runtime.AllocateEdgeActor();
+ ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
+
+ SendSubscribeRequest(runtime, sender, coordinatorId);
+ auto result = WaitSubscribeResult(runtime, sender);
+ auto stepPtr = result->Get()->ReadStep;
+
+ ui64 readStep1 = stepPtr->Get();
+ UNIT_ASSERT_GE(readStep1, result->Get()->LastReadStep);
+
+ SimulateSleep(runtime, TDuration::Seconds(5));
+
+ ui64 readStep2 = stepPtr->Get();
+ UNIT_ASSERT_GT(readStep2, readStep1);
+
+ ui64 waitStep = readStep2 + 2000;
+ SendWaitRequest(runtime, sender, coordinatorId, waitStep);
+
+ {
+ auto notify = WaitNotifyResult(runtime, sender);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get());
+ }
+
+ ui64 waitStep2 = stepPtr->Get() + 5000;
+ RebootTablet(runtime, coordinatorId, sender);
+ SendWaitRequest(runtime, sender, coordinatorId, waitStep2);
+
+ {
+ auto notify = WaitNotifyResult(runtime, sender);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep2);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get());
+ }
+ }
+
+ } // Y_UNIT_TEST_SUITE(MediatorTimeCast)
+
+} // namespace NKikimr::NMediatorTimeCastTest
diff --git a/ydb/core/tx/time_cast/ut/ya.make b/ydb/core/tx/time_cast/ut/ya.make
new file mode 100644
index 0000000000..1f055a3e2e
--- /dev/null
+++ b/ydb/core/tx/time_cast/ut/ya.make
@@ -0,0 +1,26 @@
+UNITTEST_FOR(ydb/core/tx/time_cast)
+
+OWNER(g:kikimr)
+
+IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ REQUIREMENTS(ram:16)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+PEERDIR(
+ ydb/core/testlib
+ ydb/core/tx
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ time_cast_ut.cpp
+)
+
+END()
diff --git a/ydb/core/tx/time_cast/ya.make b/ydb/core/tx/time_cast/ya.make
index 9c059ac2b9..e8c260833e 100644
--- a/ydb/core/tx/time_cast/ya.make
+++ b/ydb/core/tx/time_cast/ya.make
@@ -18,3 +18,7 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/core/util/testactorsys.cpp b/ydb/core/util/testactorsys.cpp
index 73d27bc9d9..4a270e4f8c 100644
--- a/ydb/core/util/testactorsys.cpp
+++ b/ydb/core/util/testactorsys.cpp
@@ -208,4 +208,12 @@ TIntrusivePtr<ITimeProvider> TTestActorSystem::CreateTimeProvider() {
return MakeIntrusive<TTestActorTimeProvider>();
}
+TIntrusivePtr<IMonotonicTimeProvider> TTestActorSystem::CreateMonotonicTimeProvider() {
+ class TTestActorMonotonicTimeProvider : public IMonotonicTimeProvider {
+ public:
+ TMonotonic Now() override { return TMonotonic::MicroSeconds(CurrentTestActorSystem->Clock.MicroSeconds()); }
+ };
+ return MakeIntrusive<TTestActorMonotonicTimeProvider>();
+}
+
}
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index ce6ad7be10..722d5a8163 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -187,6 +187,8 @@ public:
Y_VERIFY(!CurrentTestActorSystem);
CurrentTestActorSystem = this;
+
+ AppData.MonotonicTimeProvider = CreateMonotonicTimeProvider();
}
~TTestActorSystem() {
@@ -195,6 +197,7 @@ public:
}
static TIntrusivePtr<ITimeProvider> CreateTimeProvider();
+ static TIntrusivePtr<IMonotonicTimeProvider> CreateMonotonicTimeProvider();
TAppData *GetAppData() {
return &AppData;