aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-06-08 10:33:52 +0300
committersnaury <snaury@ydb.tech>2023-06-08 10:33:52 +0300
commit81d39471f2e9dc990e644f2f0547560e65f092b4 (patch)
treea2684fc02599a548e9d9b7d34555704182434de8
parentb62949a9443463174da1e380a0403934d8030cbe (diff)
downloadydb-81d39471f2e9dc990e644f2f0547560e65f092b4.tar.gz
Support read iterator repeatable read on followers
-rw-r--r--ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp2
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/check_data_tx_unit.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp207
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp464
-rw-r--r--ydb/core/tx/datashard/datashard_change_receiving.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.h17
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h21
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp29
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp158
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.h7
-rw-r--r--ydb/core/tx/datashard/datashard_split_dst.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp42
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/follower_edge.cpp98
-rw-r--r--ydb/core/tx/datashard/probes.h4
-rw-r--r--ydb/core/tx/datashard/read_iterator.h16
23 files changed, 741 insertions, 382 deletions
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
index 05819756ef8..9924f1ddb43 100644
--- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
@@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
- return issue.GetMessage().Contains("bellow low watermark");
+ return issue.GetMessage().Contains("has no snapshot at");
}), result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 4971a76c62d..d43054d9461 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -453,4 +453,5 @@ enum ETxTypes {
TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}];
TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}];
TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}];
+ TXTYPE_UPDATE_FOLLOWER_READ_EDGE = 78 [(TxTypeOpts) = {Name: "TxUpdateFollowerReadEdge"}];
}
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 28c5b364bbe..c31e6b4656b 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -269,6 +269,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 41bb30c51fd..6ec99292a69 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 41bb30c51fd..6ec99292a69 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index a7eb4c861d0..0c73d36c2e5 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp
diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp
index 53172927bc5..8f2c4518615 100644
--- a/ydb/core/tx/datashard/check_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp
@@ -86,7 +86,19 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op,
if (tx->IsMvccSnapshotRead()) {
auto snapshot = tx->GetMvccSnapshot();
- if (!DataShard.IsMvccEnabled()) {
+ if (DataShard.IsFollower()) {
+ TString err = TStringBuilder()
+ << "Operation " << *op << " cannot read from snapshot " << snapshot
+ << " using data tx on a follower " << DataShard.TabletID();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ } else if (!DataShard.IsMvccEnabled()) {
TString err = TStringBuilder()
<< "Operation " << *op << " reads from snapshot " << snapshot
<< " with MVCC feature disabled at " << DataShard.TabletID();
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 2dc2c39d736..960457cb2ad 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1885,6 +1885,9 @@ bool TDataShard::IsMvccEnabled() const {
}
TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
+ if (IsFollower())
+ return {TRowVersion::Max(), TRowVersion::Max()};
+
if (!IsMvccEnabled())
return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()};
@@ -1983,6 +1986,10 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
}
TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
+ if (IsFollower()) {
+ return {TRowVersion::Max(), TRowVersion::Max()};
+ }
+
if (!IsMvccEnabled())
return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()};
@@ -2000,6 +2007,8 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges(
const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc)
{
+ Y_VERIFY(!IsFollower(), "Unexpected attempt to promote edges on a follower");
+
TPromotePostExecuteEdges res;
res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc);
@@ -2050,6 +2059,10 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc);
}
res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc);
+ if (res.HadWrites) {
+ // Promoting write edges may promote read edge
+ PromoteFollowerReadEdge(txc);
+ }
break;
}
}
@@ -2198,6 +2211,10 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) {
}
break;
}
+
+ if (IsMvccEnabled()) {
+ PromoteFollowerReadEdge();
+ }
}
void TDataShard::CheckMediatorStateRestored() {
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index d609d6813ea..28d7730929b 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -146,8 +146,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
#define PRECHARGE_SYS_TABLE(table) \
{ \
if (txc.DB.GetScheme().GetTableInfo(table::TableId)) { \
- auto rowset = db.Table<table>().Range().Select(); \
- ready &= rowset.IsReady(); \
+ ready &= db.Table<table>().Precharge(); \
} \
}
@@ -689,91 +688,149 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
auto* userTablesSchema = scheme.GetTableInfo(Schema::UserTables::TableId);
Y_VERIFY(userTablesSchema, "UserTables");
- // Check if user tables schema has changed since last time we synchronized it
+ // Check if tables changed since last time we synchronized them
+ ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial;
ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial;
- if (lastSchemeUpdate > FollowerState.LastSchemeUpdate) {
- NIceDb::TNiceDb db(txc.DB);
- {
- LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD,
- "Updating tables metadata on follower, tabletId %" PRIu64
- " prevGen %" PRIu64 " prevStep %" PRIu64 " newGen %" PRIu64 " newStep %" PRIu64,
- TabletID(), FollowerState.LastSchemeUpdate >> 32,
- FollowerState.LastSchemeUpdate & (ui32)-1,
- lastSchemeUpdate >> 32, lastSchemeUpdate & (ui32)-1);
-
- // Reload user tables metadata
- TableInfos.clear();
-
- if (userTablesSchema->Columns.contains(Schema::UserTables::ShadowTid::ColumnId)) {
- // New schema with ShadowTid column
- auto rowset = db.Table<Schema::UserTables>().Select<
- Schema::UserTables::Tid,
- Schema::UserTables::LocalTid,
- Schema::UserTables::Schema,
- Schema::UserTables::ShadowTid>();
- if (!rowset.IsReady())
+ ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId)
+ ? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0;
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool precharged = true;
+ bool updated = false;
+ if (FollowerState.LastSysUpdate < lastSysUpdate) {
+ if (!db.Table<Schema::Sys>().Precharge()) {
+ precharged = false;
+ }
+ updated = true;
+ }
+ if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
+ if (!db.Table<Schema::UserTables>().Precharge()) {
+ precharged = false;
+ }
+ updated = true;
+ }
+ if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
+ if (!db.Table<Schema::Snapshots>().Precharge()) {
+ precharged = false;
+ }
+ updated = true;
+ }
+
+ if (!updated) {
+ return true;
+ }
+
+ if (!precharged) {
+ return false;
+ }
+
+ if (FollowerState.LastSysUpdate < lastSysUpdate) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Updating sys metadata on follower, tabletId " << TabletID()
+ << " prevGen " << (FollowerState.LastSysUpdate >> 32)
+ << " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1)
+ << " newGen " << (lastSysUpdate >> 32)
+ << " newStep " << (lastSysUpdate & (ui32)-1));
+
+ bool ready = true;
+ ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId);
+ ready &= SysGetUi64(db, Schema::Sys_CurrentSchemeShardId, CurrentSchemeShardId);
+ ready &= SnapshotManager.ReloadSys(db);
+ if (!ready) {
+ return false;
+ }
+
+ FollowerState.LastSysUpdate = lastSysUpdate;
+ }
+
+ if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Updating tables metadata on follower, tabletId " << TabletID()
+ << " prevGen " << (FollowerState.LastSchemeUpdate >> 32)
+ << " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1)
+ << " newGen " << (lastSchemeUpdate >> 32)
+ << " newStep " << (lastSchemeUpdate & (ui32)-1));
+
+ struct TRow {
+ TPathId TableId;
+ TUserTable::TPtr Table;
+ };
+
+ std::vector<TRow> tables;
+
+ if (userTablesSchema->Columns.contains(Schema::UserTables::ShadowTid::ColumnId)) {
+ // New schema with ShadowTid column
+ auto rowset = db.Table<Schema::UserTables>().Select<
+ Schema::UserTables::Tid,
+ Schema::UserTables::LocalTid,
+ Schema::UserTables::Schema,
+ Schema::UserTables::ShadowTid>();
+ if (!rowset.IsReady())
+ return false;
+ while (!rowset.EndOfSet()) {
+ ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>();
+ ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>();
+ ui32 shadowTid = rowset.GetValueOrDefault<Schema::UserTables::ShadowTid>();
+ TString schema = rowset.GetValue<Schema::UserTables::Schema>();
+ NKikimrSchemeOp::TTableDescription descr;
+ bool parseOk = ParseFromStringNoSizeLimit(descr, schema);
+ Y_VERIFY(parseOk);
+ tables.push_back(TRow{
+ TPathId(GetPathOwnerId(), tableId),
+ new TUserTable(localTid, descr, shadowTid),
+ });
+ if (!rowset.Next())
return false;
- while (!rowset.EndOfSet()) {
- ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>();
- ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>();
- ui32 shadowTid = rowset.GetValueOrDefault<Schema::UserTables::ShadowTid>();
- TString schema = rowset.GetValue<Schema::UserTables::Schema>();
- NKikimrSchemeOp::TTableDescription descr;
- bool parseOk = ParseFromStringNoSizeLimit(descr, schema);
- Y_VERIFY(parseOk);
- AddUserTable(TPathId(GetPathOwnerId(), tableId), new TUserTable(localTid, descr, shadowTid));
- if (!rowset.Next())
- return false;
- }
- } else {
- // Older schema without ShadowTid column
- auto rowset = db.Table<Schema::UserTables>().Select<
- Schema::UserTables::Tid,
- Schema::UserTables::LocalTid,
- Schema::UserTables::Schema>();
- if (!rowset.IsReady())
+ }
+ } else {
+ // Older schema without ShadowTid column
+ auto rowset = db.Table<Schema::UserTables>().Select<
+ Schema::UserTables::Tid,
+ Schema::UserTables::LocalTid,
+ Schema::UserTables::Schema>();
+ if (!rowset.IsReady())
+ return false;
+ while (!rowset.EndOfSet()) {
+ ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>();
+ ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>();
+ ui32 shadowTid = 0;
+ TString schema = rowset.GetValue<Schema::UserTables::Schema>();
+ NKikimrSchemeOp::TTableDescription descr;
+ bool parseOk = ParseFromStringNoSizeLimit(descr, schema);
+ Y_VERIFY(parseOk);
+ tables.push_back(TRow{
+ TPathId(GetPathOwnerId(), tableId),
+ new TUserTable(localTid, descr, shadowTid),
+ });
+ if (!rowset.Next())
return false;
- while (!rowset.EndOfSet()) {
- ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>();
- ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>();
- ui32 shadowTid = 0;
- TString schema = rowset.GetValue<Schema::UserTables::Schema>();
- NKikimrSchemeOp::TTableDescription descr;
- bool parseOk = ParseFromStringNoSizeLimit(descr, schema);
- Y_VERIFY(parseOk);
- AddUserTable(TPathId(GetPathOwnerId(), tableId), new TUserTable(localTid, descr, shadowTid));
- if (!rowset.Next())
- return false;
- }
}
}
+
+ TableInfos.clear();
+ for (auto& table : tables) {
+ AddUserTable(table.TableId, std::move(table.Table));
+ }
+
FollowerState.LastSchemeUpdate = lastSchemeUpdate;
}
// N.B. follower with snapshots support may be loaded in datashard without a snapshots table
- if (scheme.GetTableInfo(Schema::Snapshots::TableId)) {
- ui64 lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId).Serial;
- if (lastSnapshotsUpdate > FollowerState.LastSnapshotsUpdate) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
- "Updating snapshots metadata on follower, tabletId " << TabletID()
- << " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
- << " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
- << " newGen " << (lastSnapshotsUpdate >> 32)
- << " newStep " << (lastSnapshotsUpdate & (ui32)-1));
-
- NIceDb::TNiceDb db(txc.DB);
- if (!SnapshotManager.Reload(db)) {
- return false;
- }
+ if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Updating snapshots metadata on follower, tabletId " << TabletID()
+ << " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
+ << " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
+ << " newGen " << (lastSnapshotsUpdate >> 32)
+ << " newStep " << (lastSnapshotsUpdate & (ui32)-1));
- FollowerState.LastSnapshotsUpdate = lastSnapshotsUpdate;
+ NIceDb::TNiceDb db(txc.DB);
+ if (!SnapshotManager.ReloadSnapshots(db)) {
+ return false;
}
- // Initialize PathOwnerId (required for snapshot keys)
- if (PathOwnerId == INVALID_TABLET_ID) {
- NIceDb::TNiceDb db(txc.DB);
- LOAD_SYS_UI64(db, Schema::Sys_PathOwnerId, PathOwnerId);
- }
+ FollowerState.LastSnapshotsUpdate = lastSnapshotsUpdate;
}
return true;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index e47d076a550..0852e9e2eff 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1046,7 +1046,7 @@ public:
}
if (!snapshotFound) {
- bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
if (!isMvccReadable) {
SetStatusError(
Result->Record,
@@ -1120,32 +1120,51 @@ public:
TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead;
- if (state.IsHeadRead && !Self->IsFollower()) {
+ if (state.IsHeadRead) {
bool hasError = !Result || Result->Record.HasStatus();
if (!hasError && Reader->HasUnreadQueries()) {
- // we failed to read all at once and also there might be dependency
- // we need to wait for: after its execution we can read MVCC snapshot
+ // We failed to read everything in a single transaction
+ // We would prefer to return current result and continue reading,
+ // but we may have to retry at a different version or wait for
+ // additional dependencies before retrying.
state.IsHeadRead = false;
- // repeatable read
- SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true);
+ if (!Self->IsFollower()) {
+ // Switch to repeatable read at the same version
+ SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true);
- TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId);
- const auto& plannedOps = Self->Pipeline.GetActivePlannedOps();
- auto it = plannedOps.lower_bound(order);
- if (it != plannedOps.end() && it->first == order) {
- if (!it->second->IsReadOnly()) {
- // we need to wait this op
- AddDependency(it->second);
+ TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId);
+ const auto& plannedOps = Self->Pipeline.GetActivePlannedOps();
+ auto it = plannedOps.lower_bound(order);
+ if (it != plannedOps.end() && it->first == order) {
+ if (!it->second->IsReadOnly()) {
+ // we need to wait this op
+ AddDependency(it->second);
- // just for sanity: result should not contain anything at this step
+ // Make sure current incomplete result will not be sent
+ Result.reset(new TEvDataShard::TEvReadResult());
+
+ return EExecutionStatus::Continue;
+ }
+ }
+ } else {
+ auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge();
+ auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev();
+ if (maxRepeatable >= Self->GetSnapshotManager().GetLowWatermark() && maxRepeatable < state.ReadVersion) {
+ // We need to retry at a different version
+ state.ReadVersion = maxRepeatable;
+ SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true);
+
+ // Make sure current incomplete result will not be sent
Result.reset(new TEvDataShard::TEvReadResult());
- return EExecutionStatus::Continue;
+ return EExecutionStatus::Reschedule;
}
}
+
+ // We will send current incomplete result and continue reading from snapshot
} else {
- // either error or full read done
+ // Either error or a complete result
readType = TDataShard::EPromotePostExecuteEdges::ReadOnly;
}
}
@@ -1194,11 +1213,6 @@ public:
const auto& record = Request->Record;
- state.ReadId = record.GetReadId();
- state.PathId = TPathId(
- record.GetTableId().GetOwnerId(),
- record.GetTableId().GetTableId());
-
if (record.HasMaxRows())
state.Quota.Rows = record.GetMaxRows();
@@ -1211,94 +1225,25 @@ public:
if (record.HasMaxRowsInResult())
state.MaxRowsInResult = record.GetMaxRowsInResult();
- if (record.HasSnapshot()) {
- state.ReadVersion.Step = record.GetSnapshot().GetStep();
- state.ReadVersion.TxId = record.GetSnapshot().GetTxId();
- }
-
state.Reverse = record.GetReverse();
if (state.Reverse) {
state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1;
}
+ // Note: some checks already performed in TTxReadViaPipeline::Execute
if (state.PathId.OwnerId != Self->TabletID()) {
// owner is schemeshard, read user table
- if (state.PathId.OwnerId != Self->GetPathOwnerId()) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId
- << ", tableId: " << state.PathId.LocalPathId
- << ", from wrong owner: " << Self->GetPathOwnerId());
- return;
- }
+ Y_VERIFY(state.PathId.OwnerId == Self->GetPathOwnerId());
const auto tableId = state.PathId.LocalPathId;
auto it = Self->TableInfos.find(tableId);
- if (it == Self->TableInfos.end()) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Unknown table id: " << tableId);
- return;
- }
+ Y_VERIFY(it != Self->TableInfos.end());
auto& userTableInfo = it->second;
TableInfo = TShortTableInfo(userTableInfo);
- if (userTableInfo->IsBackup) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::BAD_REQUEST,
- "Can't read from a backup table");
- return;
- }
-
- if (!state.ReadVersion.IsMax()) {
- bool snapshotFound = false;
- if (!state.IsHeadRead) {
- const ui64 ownerId = state.PathId.OwnerId;
- TSnapshotKey snapshotKey(
- ownerId,
- tableId,
- state.ReadVersion.Step,
- state.ReadVersion.TxId);
-
- if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) {
- // TODO: do we need to acquire?
- SetUsingSnapshotFlag();
- snapshotFound = true;
- }
- }
-
- if (!snapshotFound) {
- if (Self->IsFollower()) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::UNSUPPORTED,
- TStringBuilder() << "Table id " << tableId
- << " reading from snapshot "
- << state.ReadVersion
- << " is not supported on follower shard " << Self->TabletID());
- return;
- }
-
- bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
- if (!isMvccReadable) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Table id " << tableId << " has no snapshot at "
- << state.ReadVersion << " shard " << Self->TabletID()
- << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
- << (Self->IsFollower() ? " RO replica" : ""));
- return;
- }
-
- bool isRepeatable = state.IsHeadRead ? false : true;
- SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId), isRepeatable);
- }
- }
+ Y_VERIFY(!userTableInfo->IsBackup);
+ Y_VERIFY(Self->IsMvccEnabled());
state.SchemaVersion = userTableInfo->GetTableSchemaVersion();
if (record.GetTableId().HasSchemaVersion()) {
@@ -1317,7 +1262,6 @@ public:
userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now();
} else {
// DS is owner, read system table
-
auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId);
if (!schema) {
SetStatusError(
@@ -1548,7 +1492,7 @@ private:
}
if (!snapshotFound) {
- bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
if (!isMvccReadable) {
SetStatusError(
Result->Record,
@@ -1730,6 +1674,9 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
TEvDataShard::TEvRead::TPtr Ev;
TReadIteratorId ReadId;
+ // When we need to reply with an error
+ std::unique_ptr<TEvDataShard::TEvReadResult> Reply;
+
TOperation::TPtr Op;
TVector<EExecutionUnitKind> CompleteList;
bool WaitComplete = false;
@@ -1747,8 +1694,8 @@ public:
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline execute"
<< ": at tablet# " << Self->TabletID());
- auto it = Self->ReadIterators.find(ReadId);
- if (it == Self->ReadIterators.end() && !Op) {
+ auto readIt = Self->ReadIterators.find(ReadId);
+ if (readIt == Self->ReadIterators.end() && !Op) {
// iterator aborted before we could start operation
return true;
}
@@ -1766,28 +1713,189 @@ public:
if (status != NKikimrTxDataShard::TError::OK) {
Y_VERIFY_DEBUG(!Op);
- if (Y_UNLIKELY(it == Self->ReadIterators.end())) {
+ if (Y_UNLIKELY(readIt == Self->ReadIterators.end())) {
// iterator already aborted
return true;
}
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
- SetStatusError(
- result->Record,
+ ReplyError(
Ydb::StatusIds::INTERNAL_ERROR,
TStringBuilder() << "Failed to sync follower: " << errMessage);
- result->Record.SetReadId(ReadId.ReadId);
- SendViaSession(it->second->SessionId, ReadId.Sender, Self->SelfId(), result.release());
-
return true;
}
}
if (Ev) {
+ // We must perform some initialization in transaction (e.g. after a follower sync), but before the operation is built
+ Y_VERIFY(readIt != Self->ReadIterators.end());
+ Y_VERIFY(readIt->second);
+ auto& state = *readIt->second;
+ auto* request = Ev->Get();
+ const auto& record = request->Record;
+
+ Y_VERIFY(state.State == TReadIteratorState::EState::Init);
+
+ bool setUsingSnapshotFlag = false;
+
+ // We assume that owner is schemeshard and it's a user table
+ if (state.PathId.OwnerId != Self->TabletID()) {
+ if (state.PathId.OwnerId != Self->GetPathOwnerId()) {
+ ReplyError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId
+ << ", tableId: " << state.PathId.LocalPathId
+ << ", from wrong owner: " << Self->GetPathOwnerId());
+ return true;
+ }
+
+ const auto tableId = state.PathId.LocalPathId;
+ auto it = Self->TableInfos.find(tableId);
+ if (it == Self->TableInfos.end()) {
+ ReplyError(
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Unknown table id: " << tableId);
+ return true;
+ }
+
+ auto& userTableInfo = it->second;
+ if (userTableInfo->IsBackup) {
+ ReplyError(
+ Ydb::StatusIds::BAD_REQUEST,
+ "Can't read from a backup table");
+ return true;
+ }
+
+ if (!Self->IsMvccEnabled()) {
+ ReplyError(
+ Ydb::StatusIds::UNSUPPORTED,
+ "Cannot use read iterators without mvcc");
+ return true;
+ }
+
+ if (state.IsHeadRead) {
+ // We want to try and choose a more specific non-repeatable snapshot
+ if (Self->IsFollower()) {
+ auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge();
+ // Note: during transition follower edge may be unitialized or lag behind
+ // We assume we can use it when it's not before low watermark
+ auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev();
+ if (maxRepeatable >= Self->GetSnapshotManager().GetLowWatermark()) {
+ state.ReadVersion = followerEdge;
+ state.IsHeadRead = !followerRepeatable;
+ }
+ } else {
+ state.ReadVersion = Self->GetMvccTxVersion(EMvccTxMode::ReadOnly);
+ }
+ if (!state.ReadVersion.IsMax()) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
+ Self->TabletID() << " changed HEAD read to "
+ << (state.IsHeadRead ? "non-repeatable" : "repeatable")
+ << " " << state.ReadVersion);
+ }
+ } else {
+ bool snapshotFound = false;
+
+ const ui64 ownerId = state.PathId.OwnerId;
+ TSnapshotKey snapshotKey(
+ ownerId,
+ tableId,
+ state.ReadVersion.Step,
+ state.ReadVersion.TxId);
+
+ if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) {
+ // TODO: do we need to acquire?
+ setUsingSnapshotFlag = true;
+ snapshotFound = true;
+ }
+
+ if (!snapshotFound) {
+ bool snapshotUnavailable = false;
+
+ if (state.ReadVersion < Self->GetSnapshotManager().GetLowWatermark() || state.ReadVersion.Step == Max<ui64>()) {
+ snapshotUnavailable = true;
+ }
+
+ if (Self->IsFollower()) {
+ auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge();
+ auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev();
+ if (state.ReadVersion > maxRepeatable) {
+ snapshotUnavailable = true;
+ }
+ } else {
+ auto prioritizedMvccSnapshotReads = Self->GetEnablePrioritizedMvccSnapshotReads();
+ TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads);
+ if (state.ReadVersion >= unreadableEdge) {
+ LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId);
+ Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(Ev), ctx);
+ Self->DeleteReadIterator(readIt);
+ return true;
+ }
+ }
+
+ if (snapshotUnavailable) {
+ ReplyError(
+ Ydb::StatusIds::PRECONDITION_FAILED,
+ TStringBuilder() << "Table id " << tableId << " has no snapshot at "
+ << state.ReadVersion << " shard " << Self->TabletID()
+ << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
+ << (Self->IsFollower() ? " RO replica" : ""));
+ return true;
+ }
+ }
+ }
+ } else {
+ // Handle system table reads
+ if (Self->IsFollower()) {
+ ReplyError(
+ Ydb::StatusIds::UNSUPPORTED,
+ "Followers don't support system table reads");
+ return true;
+ }
+ if (!state.IsHeadRead) {
+ ReplyError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion);
+ return true;
+ }
+ if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) {
+ ReplyError(
+ Ydb::StatusIds::BAD_REQUEST,
+ "Cannot read from user tables using system tables");
+ return true;
+ }
+ if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) {
+ ReplyError(
+ Ydb::StatusIds::UNSUPPORTED,
+ TStringBuilder() << "Unsupported result format "
+ << (int)record.GetResultFormat() << " when reading from system tables");
+ return true;
+ }
+ if (record.GetTableId().HasSchemaVersion()) {
+ ReplyError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Cannot request system table at shard " << record.GetTableId().GetOwnerId()
+ << ", localTid: " << record.GetTableId().GetTableId()
+ << ", with schema: " << record.GetTableId().GetSchemaVersion());
+ return true;
+ }
+
+ // We don't want this read to interact with other operations
+ setUsingSnapshotFlag = true;
+ }
+
const ui64 tieBreaker = Self->NextTieBreakerIndex++;
Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev);
Op->BuildExecutionPlan(false);
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
+ if (!state.ReadVersion.IsMax()) {
+ Op->SetMvccSnapshot(
+ TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId),
+ /* repeatable = */ state.IsHeadRead ? false : true);
+ }
+ if (setUsingSnapshotFlag) {
+ Op->SetUsingSnapshotFlag();
+ }
+
Ev = nullptr;
Op->IncrementInProgress();
}
@@ -1845,10 +1953,28 @@ public:
}
}
+ void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message) {
+ Reply = std::make_unique<TEvDataShard::TEvReadResult>();
+ SetStatusError(Reply->Record, code, message);
+ Reply->Record.SetReadId(ReadId.ReadId);
+ }
+
void Complete(const TActorContext& ctx) override {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete"
<< ": at tablet# " << Self->TabletID());
+ if (Reply) {
+ Y_VERIFY(!Op);
+ auto it = Self->ReadIterators.find(ReadId);
+ if (it != Self->ReadIterators.end()) {
+ Y_VERIFY(it->second);
+ auto& state = *it->second;
+ SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), Reply.release());
+ Self->DeleteReadIterator(it);
+ }
+ return;
+ }
+
if (!Op)
return;
@@ -1967,7 +2093,7 @@ public:
}
if (!snapshotFound) {
- bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
if (!isMvccReadable) {
SetStatusError(
Result->Record,
@@ -2216,6 +2342,14 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
+ if (Pipeline.HasProposeDelayers()) {
+ LWTRACK(ReadWaitProposeDelayers, request->Orbit);
+ Pipeline.RegisterWaitingReadIterator(readId, request);
+ DelayedProposeQueue.emplace_back().Reset(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
if (Pipeline.HasDrop()) {
replyWithError(
Ydb::StatusIds::INTERNAL_ERROR,
@@ -2249,123 +2383,24 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
TRowVersion readVersion = TRowVersion::Max();
- bool isHeadRead = false;
+ bool isHeadRead = true;
if (record.HasSnapshot()) {
readVersion.Step = record.GetSnapshot().GetStep();
readVersion.TxId = record.GetSnapshot().GetTxId();
- if (readVersion.IsMax()) {
+ if (readVersion.Step == Max<ui64>()) {
replyWithError(
Ydb::StatusIds::UNSUPPORTED,
"invalid snapshot value specified");
return;
}
+ isHeadRead = false;
}
- if (!IsFollower()) {
- if (record.GetTableId().GetOwnerId() != TabletID()) {
- // owner is schemeshard, read user table
- if (readVersion.IsMax()) {
- // transform a HEAD read into some non-repeatable snapshot
- readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr);
- ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step);
- ev->Get()->Record.MutableSnapshot()->SetTxId(readVersion.TxId);
- isHeadRead = true;
- LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed HEAD read into non-repeatable " << readVersion);
- }
-
- TSnapshotKey snapshotKey(
- record.GetTableId().GetOwnerId(),
- record.GetTableId().GetTableId(),
- readVersion.Step,
- readVersion.TxId);
-
- bool snapshotFound = GetSnapshotManager().FindAvailable(snapshotKey);
- if (!snapshotFound) {
- // check if there is MVCC version and maybe wait
- if (readVersion < GetSnapshotManager().GetLowWatermark()) {
- replyWithError(
- Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "MVCC read " << readVersion
- << " bellow low watermark " << GetSnapshotManager().GetLowWatermark());
- return;
- }
-
- // MVCC read is possible, but nead to check MVCC state and if we need to wait
-
- if (MvccSwitchState == TSwitchState::SWITCHING) {
- Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx);
- return;
- }
-
- auto prioritizedMvccSnapshotReads = GetEnablePrioritizedMvccSnapshotReads();
- TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads);
- if (readVersion >= unreadableEdge) {
- LWTRACK(ReadWaitSnapshot, request->Orbit, readVersion.Step, readVersion.TxId);
- Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx);
- return;
- }
-
- // we found proper MVCC snapshot
- snapshotFound = true;
- }
-
- if (!snapshotFound) {
- replyWithError(
- Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Neither regular nor MVCC snapshot for " << readVersion);
- return;
- }
- } else {
- // DS is owner, read system table
- if (!readVersion.IsMax()) {
- replyWithError(
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Only HEAD read from sys tables is allowed");
- return;
- }
-
- if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) {
- replyWithError(
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Only sys tables can be read by localTid, table "
- << record.GetTableId().GetTableId());
- return;
- }
-
- if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) {
- replyWithError(
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Sys tables can be read only in cellvec format, but requested "
- << (int)NKikimrTxDataShard::CELLVEC);
- return;
- }
-
- if (record.GetTableId().HasSchemaVersion()) {
- replyWithError(
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Requesting system stable owned " << record.GetTableId().GetOwnerId()
- << ", localTid: " << record.GetTableId().GetTableId()
- << ", with schema: " << record.GetTableId().GetSchemaVersion());
- return;
- }
- }
- } else {
- if (record.GetTableId().GetOwnerId() == TabletID()) {
- replyWithError(
- Ydb::StatusIds::UNSUPPORTED,
- "Systable reads on followers are not supported");
- return;
- }
-
- // follower: we can't check snapshot version, because need to sync and to sync
- // we need transaction
- if (readVersion.IsMax()) {
- isHeadRead = true;
- }
+ if (MvccSwitchState == TSwitchState::SWITCHING) {
+ Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx);
+ return;
}
- // Note: iterator is correct and ready to execute
-
TActorId sessionId;
if (readId.Sender.NodeId() != SelfId().NodeId()) {
Y_VERIFY_DEBUG(ev->InterconnectSession);
@@ -2386,7 +2421,10 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
ReadIterators.emplace(
readId,
- new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit)));
+ new TReadIteratorState(
+ readId, TPathId(record.GetTableId().GetOwnerId(), record.GetTableId().GetTableId()),
+ sessionId, readVersion, isHeadRead,
+ AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit)));
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp
index ee36aef45a7..1f1b57ab246 100644
--- a/ydb/core/tx/datashard/datashard_change_receiving.cpp
+++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp
@@ -381,6 +381,11 @@ public:
if (completeEdge) {
Self->PromoteCompleteEdge(completeEdge, txc);
+ // NOTE: asynchronous indexes are inconsistent by their nature
+ // We just promote follower read edge to the latest version
+ // and say it's repeatable, even though we cannot possibly know
+ // which versions are consistent and repeatable.
+ Self->GetSnapshotManager().PromoteFollowerReadEdge(completeEdge, true, txc);
}
return true;
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
index 3ac4eb40b4d..2fa1b814409 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
@@ -140,6 +140,9 @@ void TDependencyTracker::FlushImmediateWrites() noexcept {
}
const TDependencyTracker::TDependencyTrackingLogic& TDependencyTracker::GetTrackingLogic() const noexcept {
+ if (Self->IsFollower())
+ return FollowerLogic;
+
if (Self->IsMvccEnabled())
return MvccLogic;
@@ -1074,5 +1077,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp
}
}
+void TDependencyTracker::TFollowerDependencyTrackingLogic::AddOperation(const TOperation::TPtr&) const noexcept {
+ // all follower operations are readonly and don't conflict
+}
+
+void TDependencyTracker::TFollowerDependencyTrackingLogic::RemoveOperation(const TOperation::TPtr&) const noexcept {
+ // all follower operations are readonly and don't conflict
+}
+
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h
index d6fab6ee5c6..4c5a3746b02 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.h
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.h
@@ -81,11 +81,17 @@ private:
void RemoveOperation(const TOperation::TPtr& op) const noexcept override;
};
+ struct TFollowerDependencyTrackingLogic : public TDependencyTrackingLogic {
+ explicit TFollowerDependencyTrackingLogic(TDependencyTracker& parent)
+ : TDependencyTrackingLogic(parent) {}
+
+ void AddOperation(const TOperation::TPtr& op) const noexcept override;
+ void RemoveOperation(const TOperation::TPtr& op) const noexcept override;
+ };
+
public:
TDependencyTracker(TDataShard* self)
: Self(self)
- , DefaultLogic(*this)
- , MvccLogic(*this)
{ }
public:
@@ -130,7 +136,7 @@ private:
const TDependencyTrackingLogic& GetTrackingLogic() const noexcept;
private:
- TDataShard* Self;
+ TDataShard* const Self;
// Temporary vectors for building dependencies
TKeys TmpRead;
TKeys TmpWrite;
@@ -158,8 +164,9 @@ private:
TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedPlannedWrites;
TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedImmediateWrites;
- const TDefaultDependencyTrackingLogic DefaultLogic;
- const TMvccDependencyTrackingLogic MvccLogic;
+ const TDefaultDependencyTrackingLogic DefaultLogic{ *this };
+ const TMvccDependencyTrackingLogic MvccLogic{ *this };
+ const TFollowerDependencyTrackingLogic FollowerLogic{ *this };
};
} // namespace NDataShard
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index b4b580226da..1940eb769b5 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -224,6 +224,7 @@ class TDataShard
class TTxVolatileTxAbort;
class TTxCdcStreamScanRun;
class TTxCdcStreamScanProgress;
+ class TTxUpdateFollowerReadEdge;
template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
@@ -1060,6 +1061,14 @@ class TDataShard
Sys_LastLoanTableTid, // 41 Last tid that we used in LoanTable
+ // The last step:txId that is unconditionally readable on followers
+ // without producing possibly inconsistent results. When repeatable
+ // is set leader will also never add new writes to this edge, making
+ // it possible to use the edge as a local snapshot.
+ SysMvcc_FollowerReadEdgeStep = 42,
+ SysMvcc_FollowerReadEdgeTxId = 43,
+ SysMvcc_FollowerReadEdgeRepeatable = 44,
+
// reserved
SysPipeline_Flags = 1000,
SysPipeline_LimitActiveTx,
@@ -1799,6 +1808,13 @@ public:
// Returns true when datashard is working in mvcc mode
bool IsMvccEnabled() const;
+ // Calculates current follower read edge
+ std::tuple<TRowVersion, bool, ui64> CalculateFollowerReadEdge() const;
+
+ // Promotes current follower read edge
+ bool PromoteFollowerReadEdge(TTransactionContext& txc);
+ bool PromoteFollowerReadEdge();
+
// Returns a suitable row version for performing a transaction
TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const;
@@ -2234,6 +2250,7 @@ private:
// For follower only
struct TFollowerState {
+ ui64 LastSysUpdate = 0;
ui64 LastSchemeUpdate = 0;
ui64 LastSnapshotsUpdate = 0;
};
@@ -2340,7 +2357,7 @@ private:
};
TProposeQueue ProposeQueue;
- TVector<THolder<TEvDataShard::TEvProposeTransaction::THandle>> DelayedProposeQueue;
+ TVector<THolder<IEventHandle>> DelayedProposeQueue;
TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig;
THolder<NTabletPipe::IClientCache> PipeClientCache;
@@ -2658,6 +2675,8 @@ private:
NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver;
+ bool UpdateFollowerReadEdgePending = false;
+
public:
auto& GetLockChangeRecords() {
return LockChangeRecords;
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 94e4646ed12..8a6e822d019 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -11,8 +11,6 @@
namespace NKikimr {
namespace NDataShard {
-#define LOAD_SYS_UI64(db, row, value) if (!TDataShard::SysGetUi64(db, row, value)) return false;
-
TPipeline::TPipeline(TDataShard * self)
: Self(self)
, DepTracker(self)
@@ -45,14 +43,19 @@ bool TPipeline::Load(NIceDb::TNiceDb& db) {
using Schema = TDataShard::Schema;
Y_VERIFY(!SchemaTx);
- LOAD_SYS_UI64(db, Schema::Sys_LastPlannedStep, LastPlannedTx.Step);
- LOAD_SYS_UI64(db, Schema::Sys_LastPlannedTx, LastPlannedTx.TxId);
- LOAD_SYS_UI64(db, Schema::Sys_LastCompleteStep, LastCompleteTx.Step);
- LOAD_SYS_UI64(db, Schema::Sys_LastCompleteTx, LastCompleteTx.TxId);
- LOAD_SYS_UI64(db, Schema::Sys_AliveStep, KeepSchemaStep);
- LOAD_SYS_UI64(db, Schema::SysPipeline_Flags, Config.Flags);
- LOAD_SYS_UI64(db, Schema::SysPipeline_LimitActiveTx, Config.LimitActiveTx);
- LOAD_SYS_UI64(db, Schema::SysPipeline_LimitDataTxCache, Config.LimitDataTxCache);
+
+ bool ready = true;
+ ready &= Self->SysGetUi64(db, Schema::Sys_LastPlannedStep, LastPlannedTx.Step);
+ ready &= Self->SysGetUi64(db, Schema::Sys_LastPlannedTx, LastPlannedTx.TxId);
+ ready &= Self->SysGetUi64(db, Schema::Sys_LastCompleteStep, LastCompleteTx.Step);
+ ready &= Self->SysGetUi64(db, Schema::Sys_LastCompleteTx, LastCompleteTx.TxId);
+ ready &= Self->SysGetUi64(db, Schema::Sys_AliveStep, KeepSchemaStep);
+ ready &= Self->SysGetUi64(db, Schema::SysPipeline_Flags, Config.Flags);
+ ready &= Self->SysGetUi64(db, Schema::SysPipeline_LimitActiveTx, Config.LimitActiveTx);
+ ready &= Self->SysGetUi64(db, Schema::SysPipeline_LimitDataTxCache, Config.LimitDataTxCache);
+ if (!ready) {
+ return false;
+ }
Config.Validate();
UtmostCompleteTx = LastCompleteTx;
@@ -941,6 +944,10 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc,
Self->TransQueue.RemoveTx(db, *op);
RemoveInReadSets(op, db);
+ if (Self->IsMvccEnabled()) {
+ Self->PromoteFollowerReadEdge(txc);
+ }
+
while (!DelayedAcks.empty()
&& DelayedAcks.begin()->first.Step <= OutdatedReadSetStep())
{
@@ -1960,7 +1967,7 @@ void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) {
}
bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const {
- if (!Self->IsMvccEnabled() || !op->IsMvccSnapshotRead() || op->HasWaitCompletionFlag())
+ if (Self->IsFollower() || !Self->IsMvccEnabled() || !op->IsMvccSnapshotRead() || op->HasWaitCompletionFlag())
return true;
// don't send errors early
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index 9b489f0f108..b328f4326dd 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -15,13 +15,31 @@ void TSnapshotManager::Reset() {
IncompleteEdge = TRowVersion::Min();
CompleteEdge = TRowVersion::Min();
LowWatermark = TRowVersion::Min();
+ ImmediateWriteEdge = TRowVersion::Min();
+ ImmediateWriteEdgeReplied = TRowVersion::Min();
+ UnprotectedReadEdge = TRowVersion::Min();
CommittedCompleteEdge = TRowVersion::Min();
+ FollowerReadEdge = TRowVersion::Min();
+ FollowerReadEdgeRepeatable = false;
Snapshots.clear();
}
bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
+ bool ready = true;
+
+ ready &= ReloadSys(db);
+ ready &= ReloadSnapshots(db);
+
+ if (ready) {
+ Self->SetCounter(COUNTER_MVCC_ENABLED, IsMvccEnabled() ? 1 : 0);
+ }
+
+ return ready;
+}
+
+bool TSnapshotManager::ReloadSys(NIceDb::TNiceDb& db) {
using Schema = TDataShard::Schema;
bool ready = true;
@@ -31,63 +49,33 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
TRowVersion incompleteEdge = TRowVersion::Min();
TRowVersion lowWatermark = TRowVersion::Min();
TRowVersion immediateWriteEdge = TRowVersion::Min();
+ TRowVersion followerReadEdge = TRowVersion::Min();
ui32 mvccState = 0;
ui64 keepSnapshotTimeout = 0;
ui64 unprotectedReads = 0;
-
- TSnapshotMap snapshots;
+ ui64 followerReadEdgeRepeatable = 0;
ready &= Self->SysGetUi64(db, Schema::Sys_MinWriteVersionStep, minWriteVersion.Step);
ready &= Self->SysGetUi64(db, Schema::Sys_MinWriteVersionTxId, minWriteVersion.TxId);
- if (!Self->IsFollower()) {
- // We don't currently support mvcc on the follower
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step);
- ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId);
- }
-
- {
- auto rowset = db.Table<Schema::Snapshots>().Range().Select();
- if (rowset.IsReady()) {
- while (!rowset.EndOfSet()) {
- ui64 oid = rowset.GetValue<Schema::Snapshots::Oid>();
- ui64 tid = rowset.GetValue<Schema::Snapshots::Tid>();
- ui64 step = rowset.GetValue<Schema::Snapshots::Step>();
- ui64 txId = rowset.GetValue<Schema::Snapshots::TxId>();
- TString name = rowset.GetValue<Schema::Snapshots::Name>();
- ui64 flags = rowset.GetValue<Schema::Snapshots::Flags>();
- ui64 timeout_ms = rowset.GetValue<Schema::Snapshots::TimeoutMs>();
-
- TSnapshotKey key(oid, tid, step, txId);
-
- auto res = snapshots.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(key),
- std::forward_as_tuple(key, std::move(name), flags, TDuration::MilliSeconds(timeout_ms)));
- Y_VERIFY_S(res.second, "Unexpected duplicate snapshot: " << key);
-
- if (!rowset.Next()) {
- ready = false;
- break;
- }
- }
- } else {
- ready = false;
- }
- }
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeStep, followerReadEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeTxId, followerReadEdge.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, followerReadEdgeRepeatable);
if (ready) {
- // We have a consistent view of snapshots table, apply
+ // We have a consistent view of settings, apply
MinWriteVersion = minWriteVersion;
MvccState = static_cast<EMvccState>(mvccState);
KeepSnapshotTimeout = keepSnapshotTimeout;
@@ -105,14 +93,46 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
ImmediateWriteEdgeReplied.TxId = Max<ui64>();
}
CommittedCompleteEdge = completeEdge;
- Snapshots = std::move(snapshots);
+ FollowerReadEdge = followerReadEdge;
+ FollowerReadEdgeRepeatable = (followerReadEdgeRepeatable != 0);
}
- if (ready) {
- Self->SetCounter(COUNTER_MVCC_ENABLED, IsMvccEnabled() ? 1 : 0);
+ return ready;
+}
+
+bool TSnapshotManager::ReloadSnapshots(NIceDb::TNiceDb& db) {
+ using Schema = TDataShard::Schema;
+
+ TSnapshotMap snapshots;
+
+ auto rowset = db.Table<Schema::Snapshots>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ ui64 oid = rowset.GetValue<Schema::Snapshots::Oid>();
+ ui64 tid = rowset.GetValue<Schema::Snapshots::Tid>();
+ ui64 step = rowset.GetValue<Schema::Snapshots::Step>();
+ ui64 txId = rowset.GetValue<Schema::Snapshots::TxId>();
+ TString name = rowset.GetValue<Schema::Snapshots::Name>();
+ ui64 flags = rowset.GetValue<Schema::Snapshots::Flags>();
+ ui64 timeout_ms = rowset.GetValue<Schema::Snapshots::TimeoutMs>();
+
+ TSnapshotKey key(oid, tid, step, txId);
+
+ auto res = snapshots.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(key),
+ std::forward_as_tuple(key, std::move(name), flags, TDuration::MilliSeconds(timeout_ms)));
+ Y_VERIFY_S(res.second, "Unexpected duplicate snapshot: " << key);
+
+ if (!rowset.Next()) {
+ return false;
+ }
}
- return ready;
+ Snapshots = std::move(snapshots);
+ return true;
}
void TSnapshotManager::InitExpireQueue(TInstant now) {
@@ -299,6 +319,33 @@ void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedRea
});
}
+std::pair<TRowVersion, bool> TSnapshotManager::GetFollowerReadEdge() const {
+ return { FollowerReadEdge, FollowerReadEdgeRepeatable };
+}
+
+bool TSnapshotManager::PromoteFollowerReadEdge(const TRowVersion& version, bool repeatable, TTransactionContext& txc) {
+ using Schema = TDataShard::Schema;
+
+ if (IsMvccEnabled()) {
+ if (FollowerReadEdge < version) {
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeStep, version.Step);
+ Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeTxId, version.TxId);
+ Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, ui64(repeatable ? 1 : 0));
+ FollowerReadEdge = version;
+ FollowerReadEdgeRepeatable = repeatable;
+ return true;
+ } else if (FollowerReadEdge == version && repeatable && !FollowerReadEdgeRepeatable) {
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, ui64(repeatable ? 1 : 0));
+ FollowerReadEdgeRepeatable = repeatable;
+ return true;
+ }
+ }
+
+ return false;
+}
+
void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) {
using Schema = TDataShard::Schema;
@@ -696,6 +743,7 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext&
if (CompleteEdge.Step < ImmediateWriteEdgeReplied.Step) {
PromoteCompleteEdge(ImmediateWriteEdgeReplied.Step, txc);
+ Self->PromoteFollowerReadEdge(txc);
}
// Calculate the maximum version where we may have written something
@@ -705,7 +753,13 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext&
maxWriteVersion = ImmediateWriteEdge;
}
- removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion));
+ // Make sure we don't leave followers without any repeatable read version
+ TRowVersion maxRepeatableRead = FollowerReadEdge;
+ if (maxRepeatableRead && !FollowerReadEdgeRepeatable) {
+ maxRepeatableRead = maxRepeatableRead.Prev();
+ }
+
+ removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion, maxRepeatableRead));
LastAdvanceWatermark = NActors::TActivationContext::Monotonic();
return removed;
diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h
index eb868fa4dbc..44e17632656 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.h
+++ b/ydb/core/tx/datashard/datashard_snapshots.h
@@ -109,6 +109,8 @@ public:
void Reset();
bool Reload(NIceDb::TNiceDb& db);
+ bool ReloadSys(NIceDb::TNiceDb& db);
+ bool ReloadSnapshots(NIceDb::TNiceDb& db);
void InitExpireQueue(TInstant now);
@@ -144,6 +146,9 @@ public:
bool IsPerformedUnprotectedReadsCommitted() const;
void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc);
+ std::pair<TRowVersion, bool> GetFollowerReadEdge() const;
+ bool PromoteFollowerReadEdge(const TRowVersion& version, bool repeatable, TTransactionContext& txc);
+
EMvccState GetMvccState() const {
return MvccState;
}
@@ -228,6 +233,8 @@ private:
TRowVersion UnprotectedReadEdge = TRowVersion::Min();
TRowVersion CommittedCompleteEdge = TRowVersion::Min();
+ TRowVersion FollowerReadEdge = TRowVersion::Min();
+ bool FollowerReadEdgeRepeatable = false;
TSnapshotMap Snapshots;
diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp
index 41dd9f45c4f..2d33d53ee3e 100644
--- a/ydb/core/tx/datashard/datashard_split_dst.cpp
+++ b/ydb/core/tx/datashard/datashard_split_dst.cpp
@@ -281,6 +281,10 @@ public:
kv.second.OptimizeSplitKeys(rdb);
}
+ if (mvcc) {
+ Self->PromoteFollowerReadEdge(txc);
+ }
+
Self->State = TShardState::Ready;
Self->PersistSys(db, Schema::Sys_State, Self->State);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index f2938f5ad2a..829c2589301 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -2312,7 +2312,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
TestReadKey(NKikimrTxDataShard::CELLVEC, true);
}
- Y_UNIT_TEST(ShouldNotReadMvccFromFollower) {
+ Y_UNIT_TEST(ShouldNotReadFutureMvccFromFollower) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
@@ -2326,7 +2326,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
AddKeyQuery(*request, {3, 3, 3});
auto readResult = helper.SendRead("table-1", request.release());
const auto& record = readResult->Record;
- UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::PRECONDITION_FAILED);
}
Y_UNIT_TEST(ShouldReadHeadFromFollower) {
@@ -2791,24 +2791,27 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
switch (event->GetTypeRewrite()) {
case TEvMediatorTimecast::EvUpdate: {
+ auto* update = event->Get<TEvMediatorTimecast::TEvUpdate>();
+ lastStep = update->Record.GetTimeBarrier();
+ Cerr << "... observed TEvUpdate(" << lastStep << ")" << Endl;
if (captureTimecast) {
- auto update = event->Get<TEvMediatorTimecast::TEvUpdate>();
- lastStep = update->Record.GetTimeBarrier();
Cerr << "---- dropped EvUpdate ----" << Endl;
return TTestActorRuntime::EEventAction::DROP;
}
break;
}
case TEvMediatorTimecast::EvWaitPlanStep: {
+ auto* waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
+ Cerr << "... observed TEvWaitPlanStep(" << waitEvent->PlanStep << ")" << Endl;
if (captureWaitNotify) {
- auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
waitPlanStep = waitEvent->PlanStep;
}
break;
}
case TEvMediatorTimecast::EvNotifyPlanStep: {
+ auto* notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>();
+ Cerr << "... observed TEvNotifyPlanStep(" << notifyEvent->PlanStep << ")" << Endl;
if (captureWaitNotify) {
- auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>();
notifyPlanStep = notifyEvent->PlanStep;
}
break;
@@ -2835,7 +2838,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
captureWaitNotify = true;
// future snapshot
- snapshot = TRowVersion(lastStep + 1000, Max<ui64>());
+ snapshot = TRowVersion(lastStep + 3000, Max<ui64>());
auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot);
AddKeyQuery(*request1, {3, 3, 3});
@@ -2845,7 +2848,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto readResult1 = helper.SendRead("table-1", request1.release());
- waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep");
+ waitFor([&]{ return notifyPlanStep >= snapshot.Step; }, TStringBuilder() << "intercepted TEvNotifyPlanStep for snapshot " << snapshot);
UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step);
UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step);
@@ -2917,24 +2920,27 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
switch (event->GetTypeRewrite()) {
case TEvMediatorTimecast::EvUpdate: {
+ auto* update = event->Get<TEvMediatorTimecast::TEvUpdate>();
+ lastStep = update->Record.GetTimeBarrier();
+ Cerr << "... observed TEvUpdate(" << lastStep << ")" << Endl;
if (captureTimecast) {
- auto update = event->Get<TEvMediatorTimecast::TEvUpdate>();
- lastStep = update->Record.GetTimeBarrier();
Cerr << "---- dropped EvUpdate ----" << Endl;
return TTestActorRuntime::EEventAction::DROP;
}
break;
}
case TEvMediatorTimecast::EvWaitPlanStep: {
+ auto* waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
+ Cerr << "... observed TEvWaitPlanStep(" << waitEvent->PlanStep << ")" << Endl;
if (captureWaitNotify) {
- auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
waitPlanStep = waitEvent->PlanStep;
}
break;
}
case TEvMediatorTimecast::EvNotifyPlanStep: {
+ auto* notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>();
+ Cerr << "... observed TEvNotifyPlanStep(" << notifyEvent->PlanStep << ")" << Endl;
if (captureWaitNotify) {
- auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>();
notifyPlanStep = notifyEvent->PlanStep;
}
break;
@@ -2965,7 +2971,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
captureWaitNotify = true;
// future snapshot
- snapshot = TRowVersion(lastStep + 1000, Max<ui64>());
+ snapshot = TRowVersion(lastStep + 3000, Max<ui64>());
auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot);
AddKeyQuery(*request1, {3, 3, 3});
@@ -2975,13 +2981,13 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.SendReadAsync("table-1", request1.release());
- waitFor([&]{ return waitPlanStep != 0; }, "intercepted TEvWaitPlanStep");
+ waitFor([&]{ return waitPlanStep >= snapshot.Step; }, "intercepted TEvWaitPlanStep");
UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step);
- UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, 0);
+ UNIT_ASSERT_LT(notifyPlanStep, snapshot.Step);
helper.SendCancel("table-1", 1);
- waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep");
+ waitFor([&]{ return notifyPlanStep >= snapshot.Step; }, "intercepted TEvNotifyPlanStep");
UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step);
UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step);
@@ -3522,13 +3528,13 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
auto readResult = helper.SendRead("table-1", request.release());
const auto& record = readResult->Record;
- UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
}
};
Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
Y_UNIT_TEST(ShouldCalculateQuota) {
- NDataShard::TReadIteratorState state({}, false, {});
+ NDataShard::TReadIteratorState state(TReadIteratorId({}, 0), TPathId(0, 0), {}, TRowVersion::Max(), true, {});
state.Quota.Rows = 100;
state.Quota.Bytes = 1000;
state.ConsumeSeqNo(10, 100); // seqno1
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index 908bfcd853d..0dbd327a8f5 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -81,6 +81,8 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
if (op->IsAborted()) {
// Make sure we confirm aborts with a commit
op->SetWaitCompletionFlag(true);
+ } else if (DataShard.IsFollower()) {
+ // It doesn't matter whether we wait or not
} else if (DataShard.IsMvccEnabled() && op->IsImmediate()) {
auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc);
diff --git a/ydb/core/tx/datashard/follower_edge.cpp b/ydb/core/tx/datashard/follower_edge.cpp
new file mode 100644
index 00000000000..846c5784d2c
--- /dev/null
+++ b/ydb/core/tx/datashard/follower_edge.cpp
@@ -0,0 +1,98 @@
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+std::tuple<TRowVersion, bool, ui64> TDataShard::CalculateFollowerReadEdge() const {
+ Y_VERIFY(!IsFollower());
+ Y_VERIFY_DEBUG(IsMvccEnabled());
+
+ for (auto order : TransQueue.GetPlan()) {
+ // When we have planned operations we assume the first one may be used
+ // for new writes, so we mark is as non-repeatable. We could skip
+ // readonly operations, but there's little benefit in that, and it's
+ // complicated to determine which is the first readable given we may
+ // have executed some out of order.
+ return { TRowVersion(order.Step, order.TxId), false, 0 };
+ }
+
+ // This is the max version where we had any writes
+ TRowVersion maxWrite(SnapshotManager.GetCompleteEdge().Step, Max<ui64>());
+ if (maxWrite < SnapshotManager.GetImmediateWriteEdge()) {
+ maxWrite = SnapshotManager.GetImmediateWriteEdge();
+ }
+
+ // This is the next version that would be used for new writes
+ TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
+
+ if (maxWrite < nextWrite) {
+ return { maxWrite, true, 0 };
+ }
+
+ TRowVersion maxObserved(GetMaxObservedStep(), Max<ui64>());
+ if (maxObserved < maxWrite) {
+ return { maxObserved, true, maxWrite.Step };
+ }
+
+ return { maxWrite, false, maxWrite.Next().Step };
+}
+
+bool TDataShard::PromoteFollowerReadEdge(TTransactionContext& txc) {
+ Y_VERIFY(!IsFollower());
+
+ if (IsMvccEnabled()) {
+ auto [version, repeatable, waitStep] = CalculateFollowerReadEdge();
+
+ if (waitStep) {
+ WaitPlanStep(waitStep);
+ }
+
+ return SnapshotManager.PromoteFollowerReadEdge(version, repeatable, txc);
+ }
+
+ return false;
+}
+
+class TDataShard::TTxUpdateFollowerReadEdge
+ : public NTabletFlatExecutor::TTransactionBase<TDataShard>
+{
+public:
+ TTxUpdateFollowerReadEdge(TDataShard* self)
+ : TBase(self)
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_UPDATE_FOLLOWER_READ_EDGE; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Y_VERIFY(Self->UpdateFollowerReadEdgePending);
+ Self->UpdateFollowerReadEdgePending = false;
+ Self->PromoteFollowerReadEdge(txc);
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+};
+
+bool TDataShard::PromoteFollowerReadEdge() {
+ Y_VERIFY(!IsFollower());
+
+ if (IsMvccEnabled()) {
+ auto [currentEdge, currentRepeatable] = SnapshotManager.GetFollowerReadEdge();
+ auto [nextEdge, nextRepeatable, waitStep] = CalculateFollowerReadEdge();
+
+ if (currentEdge < nextEdge || currentEdge == nextEdge && !currentRepeatable && nextRepeatable) {
+ if (!UpdateFollowerReadEdgePending) {
+ UpdateFollowerReadEdgePending = true;
+ Execute(new TTxUpdateFollowerReadEdge(this));
+ }
+ return true;
+ } else if (waitStep) {
+ WaitPlanStep(waitStep);
+ }
+ }
+
+ return false;
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h
index e63f21a9016..b7d96b79bcb 100644
--- a/ydb/core/tx/datashard/probes.h
+++ b/ydb/core/tx/datashard/probes.h
@@ -51,6 +51,10 @@
GROUPS("DataShard"), \
TYPES(), \
NAMES()) \
+ PROBE(ReadWaitProposeDelayers, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
PROBE(ReadWaitSnapshot, \
GROUPS("DataShard"), \
TYPES(ui64, ui64), \
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index eb87c540c29..f57dd1ac839 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -64,8 +64,14 @@ struct TReadIteratorState {
};
public:
- TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts, NLWTrace::TOrbit&& orbit = {})
- : IsHeadRead(isHeadRead)
+ TReadIteratorState(
+ const TReadIteratorId& readId, const TPathId& pathId,
+ const TActorId& sessionId, const TRowVersion& readVersion, bool isHeadRead,
+ TMonotonic ts, NLWTrace::TOrbit&& orbit = {})
+ : ReadId(readId.ReadId)
+ , PathId(pathId)
+ , ReadVersion(readVersion)
+ , IsHeadRead(isHeadRead)
, SessionId(sessionId)
, StartTs(ts)
, Orbit(std::move(orbit))
@@ -154,11 +160,11 @@ public:
// Data from original request //
- ui64 ReadId = 0;
+ ui64 ReadId;
TPathId PathId;
std::vector<NTable::TTag> Columns;
- TRowVersion ReadVersion = TRowVersion::Max();
- bool IsHeadRead = false;
+ TRowVersion ReadVersion;
+ bool IsHeadRead;
ui64 LockId = 0;
ui32 LockNodeId = 0;
TLockInfo::TPtr Lock;