diff options
author | snaury <snaury@ydb.tech> | 2023-06-08 10:33:52 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-06-08 10:33:52 +0300 |
commit | 81d39471f2e9dc990e644f2f0547560e65f092b4 (patch) | |
tree | a2684fc02599a548e9d9b7d34555704182434de8 | |
parent | b62949a9443463174da1e380a0403934d8030cbe (diff) | |
download | ydb-81d39471f2e9dc990e644f2f0547560e65f092b4.tar.gz |
Support read iterator repeatable read on followers
23 files changed, 741 insertions, 382 deletions
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index 05819756ef8..9924f1ddb43 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue){ - return issue.GetMessage().Contains("bellow low watermark"); + return issue.GetMessage().Contains("has no snapshot at"); }), result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 4971a76c62d..d43054d9461 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -453,4 +453,5 @@ enum ETxTypes { TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}]; TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}]; TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}]; + TXTYPE_UPDATE_FOLLOWER_READ_EDGE = 78 [(TxTypeOpts) = {Name: "TxUpdateFollowerReadEdge"}]; } diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 28c5b364bbe..c31e6b4656b 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -269,6 +269,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 41bb30c51fd..6ec99292a69 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 41bb30c51fd..6ec99292a69 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index a7eb4c861d0..0c73d36c2e5 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -270,6 +270,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finalize_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/finish_propose_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/follower_edge.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/initiate_build_index_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/key_conflicts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/load_and_wait_in_rs_unit.cpp diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp index 53172927bc5..8f2c4518615 100644 --- a/ydb/core/tx/datashard/check_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp @@ -86,7 +86,19 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op, if (tx->IsMvccSnapshotRead()) { auto snapshot = tx->GetMvccSnapshot(); - if (!DataShard.IsMvccEnabled()) { + if (DataShard.IsFollower()) { + TString err = TStringBuilder() + << "Operation " << *op << " cannot read from snapshot " << snapshot + << " using data tx on a follower " << DataShard.TabletID(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } else if (!DataShard.IsMvccEnabled()) { TString err = TStringBuilder() << "Operation " << *op << " reads from snapshot " << snapshot << " with MVCC feature disabled at " << DataShard.TabletID(); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 2dc2c39d736..960457cb2ad 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1885,6 +1885,9 @@ bool TDataShard::IsMvccEnabled() const { } TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const { + if (IsFollower()) + return {TRowVersion::Max(), TRowVersion::Max()}; + if (!IsMvccEnabled()) return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()}; @@ -1983,6 +1986,10 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const } TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { + if (IsFollower()) { + return {TRowVersion::Max(), TRowVersion::Max()}; + } + if (!IsMvccEnabled()) return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()}; @@ -2000,6 +2007,8 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges( const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc) { + Y_VERIFY(!IsFollower(), "Unexpected attempt to promote edges on a follower"); + TPromotePostExecuteEdges res; res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc); @@ -2050,6 +2059,10 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc); } res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc); + if (res.HadWrites) { + // Promoting write edges may promote read edge + PromoteFollowerReadEdge(txc); + } break; } } @@ -2198,6 +2211,10 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) { } break; } + + if (IsMvccEnabled()) { + PromoteFollowerReadEdge(); + } } void TDataShard::CheckMediatorStateRestored() { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index d609d6813ea..28d7730929b 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -146,8 +146,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { #define PRECHARGE_SYS_TABLE(table) \ { \ if (txc.DB.GetScheme().GetTableInfo(table::TableId)) { \ - auto rowset = db.Table<table>().Range().Select(); \ - ready &= rowset.IsReady(); \ + ready &= db.Table<table>().Precharge(); \ } \ } @@ -689,91 +688,149 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont auto* userTablesSchema = scheme.GetTableInfo(Schema::UserTables::TableId); Y_VERIFY(userTablesSchema, "UserTables"); - // Check if user tables schema has changed since last time we synchronized it + // Check if tables changed since last time we synchronized them + ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial; ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial; - if (lastSchemeUpdate > FollowerState.LastSchemeUpdate) { - NIceDb::TNiceDb db(txc.DB); - { - LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, - "Updating tables metadata on follower, tabletId %" PRIu64 - " prevGen %" PRIu64 " prevStep %" PRIu64 " newGen %" PRIu64 " newStep %" PRIu64, - TabletID(), FollowerState.LastSchemeUpdate >> 32, - FollowerState.LastSchemeUpdate & (ui32)-1, - lastSchemeUpdate >> 32, lastSchemeUpdate & (ui32)-1); - - // Reload user tables metadata - TableInfos.clear(); - - if (userTablesSchema->Columns.contains(Schema::UserTables::ShadowTid::ColumnId)) { - // New schema with ShadowTid column - auto rowset = db.Table<Schema::UserTables>().Select< - Schema::UserTables::Tid, - Schema::UserTables::LocalTid, - Schema::UserTables::Schema, - Schema::UserTables::ShadowTid>(); - if (!rowset.IsReady()) + ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId) + ? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0; + + NIceDb::TNiceDb db(txc.DB); + + bool precharged = true; + bool updated = false; + if (FollowerState.LastSysUpdate < lastSysUpdate) { + if (!db.Table<Schema::Sys>().Precharge()) { + precharged = false; + } + updated = true; + } + if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) { + if (!db.Table<Schema::UserTables>().Precharge()) { + precharged = false; + } + updated = true; + } + if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) { + if (!db.Table<Schema::Snapshots>().Precharge()) { + precharged = false; + } + updated = true; + } + + if (!updated) { + return true; + } + + if (!precharged) { + return false; + } + + if (FollowerState.LastSysUpdate < lastSysUpdate) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Updating sys metadata on follower, tabletId " << TabletID() + << " prevGen " << (FollowerState.LastSysUpdate >> 32) + << " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1) + << " newGen " << (lastSysUpdate >> 32) + << " newStep " << (lastSysUpdate & (ui32)-1)); + + bool ready = true; + ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId); + ready &= SysGetUi64(db, Schema::Sys_CurrentSchemeShardId, CurrentSchemeShardId); + ready &= SnapshotManager.ReloadSys(db); + if (!ready) { + return false; + } + + FollowerState.LastSysUpdate = lastSysUpdate; + } + + if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Updating tables metadata on follower, tabletId " << TabletID() + << " prevGen " << (FollowerState.LastSchemeUpdate >> 32) + << " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1) + << " newGen " << (lastSchemeUpdate >> 32) + << " newStep " << (lastSchemeUpdate & (ui32)-1)); + + struct TRow { + TPathId TableId; + TUserTable::TPtr Table; + }; + + std::vector<TRow> tables; + + if (userTablesSchema->Columns.contains(Schema::UserTables::ShadowTid::ColumnId)) { + // New schema with ShadowTid column + auto rowset = db.Table<Schema::UserTables>().Select< + Schema::UserTables::Tid, + Schema::UserTables::LocalTid, + Schema::UserTables::Schema, + Schema::UserTables::ShadowTid>(); + if (!rowset.IsReady()) + return false; + while (!rowset.EndOfSet()) { + ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>(); + ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>(); + ui32 shadowTid = rowset.GetValueOrDefault<Schema::UserTables::ShadowTid>(); + TString schema = rowset.GetValue<Schema::UserTables::Schema>(); + NKikimrSchemeOp::TTableDescription descr; + bool parseOk = ParseFromStringNoSizeLimit(descr, schema); + Y_VERIFY(parseOk); + tables.push_back(TRow{ + TPathId(GetPathOwnerId(), tableId), + new TUserTable(localTid, descr, shadowTid), + }); + if (!rowset.Next()) return false; - while (!rowset.EndOfSet()) { - ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>(); - ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>(); - ui32 shadowTid = rowset.GetValueOrDefault<Schema::UserTables::ShadowTid>(); - TString schema = rowset.GetValue<Schema::UserTables::Schema>(); - NKikimrSchemeOp::TTableDescription descr; - bool parseOk = ParseFromStringNoSizeLimit(descr, schema); - Y_VERIFY(parseOk); - AddUserTable(TPathId(GetPathOwnerId(), tableId), new TUserTable(localTid, descr, shadowTid)); - if (!rowset.Next()) - return false; - } - } else { - // Older schema without ShadowTid column - auto rowset = db.Table<Schema::UserTables>().Select< - Schema::UserTables::Tid, - Schema::UserTables::LocalTid, - Schema::UserTables::Schema>(); - if (!rowset.IsReady()) + } + } else { + // Older schema without ShadowTid column + auto rowset = db.Table<Schema::UserTables>().Select< + Schema::UserTables::Tid, + Schema::UserTables::LocalTid, + Schema::UserTables::Schema>(); + if (!rowset.IsReady()) + return false; + while (!rowset.EndOfSet()) { + ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>(); + ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>(); + ui32 shadowTid = 0; + TString schema = rowset.GetValue<Schema::UserTables::Schema>(); + NKikimrSchemeOp::TTableDescription descr; + bool parseOk = ParseFromStringNoSizeLimit(descr, schema); + Y_VERIFY(parseOk); + tables.push_back(TRow{ + TPathId(GetPathOwnerId(), tableId), + new TUserTable(localTid, descr, shadowTid), + }); + if (!rowset.Next()) return false; - while (!rowset.EndOfSet()) { - ui64 tableId = rowset.GetValue<Schema::UserTables::Tid>(); - ui32 localTid = rowset.GetValue<Schema::UserTables::LocalTid>(); - ui32 shadowTid = 0; - TString schema = rowset.GetValue<Schema::UserTables::Schema>(); - NKikimrSchemeOp::TTableDescription descr; - bool parseOk = ParseFromStringNoSizeLimit(descr, schema); - Y_VERIFY(parseOk); - AddUserTable(TPathId(GetPathOwnerId(), tableId), new TUserTable(localTid, descr, shadowTid)); - if (!rowset.Next()) - return false; - } } } + + TableInfos.clear(); + for (auto& table : tables) { + AddUserTable(table.TableId, std::move(table.Table)); + } + FollowerState.LastSchemeUpdate = lastSchemeUpdate; } // N.B. follower with snapshots support may be loaded in datashard without a snapshots table - if (scheme.GetTableInfo(Schema::Snapshots::TableId)) { - ui64 lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId).Serial; - if (lastSnapshotsUpdate > FollowerState.LastSnapshotsUpdate) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Updating snapshots metadata on follower, tabletId " << TabletID() - << " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32) - << " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1) - << " newGen " << (lastSnapshotsUpdate >> 32) - << " newStep " << (lastSnapshotsUpdate & (ui32)-1)); - - NIceDb::TNiceDb db(txc.DB); - if (!SnapshotManager.Reload(db)) { - return false; - } + if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Updating snapshots metadata on follower, tabletId " << TabletID() + << " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32) + << " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1) + << " newGen " << (lastSnapshotsUpdate >> 32) + << " newStep " << (lastSnapshotsUpdate & (ui32)-1)); - FollowerState.LastSnapshotsUpdate = lastSnapshotsUpdate; + NIceDb::TNiceDb db(txc.DB); + if (!SnapshotManager.ReloadSnapshots(db)) { + return false; } - // Initialize PathOwnerId (required for snapshot keys) - if (PathOwnerId == INVALID_TABLET_ID) { - NIceDb::TNiceDb db(txc.DB); - LOAD_SYS_UI64(db, Schema::Sys_PathOwnerId, PathOwnerId); - } + FollowerState.LastSnapshotsUpdate = lastSnapshotsUpdate; } return true; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index e47d076a550..0852e9e2eff 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1046,7 +1046,7 @@ public: } if (!snapshotFound) { - bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); if (!isMvccReadable) { SetStatusError( Result->Record, @@ -1120,32 +1120,51 @@ public: TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead; - if (state.IsHeadRead && !Self->IsFollower()) { + if (state.IsHeadRead) { bool hasError = !Result || Result->Record.HasStatus(); if (!hasError && Reader->HasUnreadQueries()) { - // we failed to read all at once and also there might be dependency - // we need to wait for: after its execution we can read MVCC snapshot + // We failed to read everything in a single transaction + // We would prefer to return current result and continue reading, + // but we may have to retry at a different version or wait for + // additional dependencies before retrying. state.IsHeadRead = false; - // repeatable read - SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); + if (!Self->IsFollower()) { + // Switch to repeatable read at the same version + SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); - TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId); - const auto& plannedOps = Self->Pipeline.GetActivePlannedOps(); - auto it = plannedOps.lower_bound(order); - if (it != plannedOps.end() && it->first == order) { - if (!it->second->IsReadOnly()) { - // we need to wait this op - AddDependency(it->second); + TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId); + const auto& plannedOps = Self->Pipeline.GetActivePlannedOps(); + auto it = plannedOps.lower_bound(order); + if (it != plannedOps.end() && it->first == order) { + if (!it->second->IsReadOnly()) { + // we need to wait this op + AddDependency(it->second); - // just for sanity: result should not contain anything at this step + // Make sure current incomplete result will not be sent + Result.reset(new TEvDataShard::TEvReadResult()); + + return EExecutionStatus::Continue; + } + } + } else { + auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge(); + auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev(); + if (maxRepeatable >= Self->GetSnapshotManager().GetLowWatermark() && maxRepeatable < state.ReadVersion) { + // We need to retry at a different version + state.ReadVersion = maxRepeatable; + SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); + + // Make sure current incomplete result will not be sent Result.reset(new TEvDataShard::TEvReadResult()); - return EExecutionStatus::Continue; + return EExecutionStatus::Reschedule; } } + + // We will send current incomplete result and continue reading from snapshot } else { - // either error or full read done + // Either error or a complete result readType = TDataShard::EPromotePostExecuteEdges::ReadOnly; } } @@ -1194,11 +1213,6 @@ public: const auto& record = Request->Record; - state.ReadId = record.GetReadId(); - state.PathId = TPathId( - record.GetTableId().GetOwnerId(), - record.GetTableId().GetTableId()); - if (record.HasMaxRows()) state.Quota.Rows = record.GetMaxRows(); @@ -1211,94 +1225,25 @@ public: if (record.HasMaxRowsInResult()) state.MaxRowsInResult = record.GetMaxRowsInResult(); - if (record.HasSnapshot()) { - state.ReadVersion.Step = record.GetSnapshot().GetStep(); - state.ReadVersion.TxId = record.GetSnapshot().GetTxId(); - } - state.Reverse = record.GetReverse(); if (state.Reverse) { state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1; } + // Note: some checks already performed in TTxReadViaPipeline::Execute if (state.PathId.OwnerId != Self->TabletID()) { // owner is schemeshard, read user table - if (state.PathId.OwnerId != Self->GetPathOwnerId()) { - SetStatusError( - Result->Record, - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId - << ", tableId: " << state.PathId.LocalPathId - << ", from wrong owner: " << Self->GetPathOwnerId()); - return; - } + Y_VERIFY(state.PathId.OwnerId == Self->GetPathOwnerId()); const auto tableId = state.PathId.LocalPathId; auto it = Self->TableInfos.find(tableId); - if (it == Self->TableInfos.end()) { - SetStatusError( - Result->Record, - Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Unknown table id: " << tableId); - return; - } + Y_VERIFY(it != Self->TableInfos.end()); auto& userTableInfo = it->second; TableInfo = TShortTableInfo(userTableInfo); - if (userTableInfo->IsBackup) { - SetStatusError( - Result->Record, - Ydb::StatusIds::BAD_REQUEST, - "Can't read from a backup table"); - return; - } - - if (!state.ReadVersion.IsMax()) { - bool snapshotFound = false; - if (!state.IsHeadRead) { - const ui64 ownerId = state.PathId.OwnerId; - TSnapshotKey snapshotKey( - ownerId, - tableId, - state.ReadVersion.Step, - state.ReadVersion.TxId); - - if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) { - // TODO: do we need to acquire? - SetUsingSnapshotFlag(); - snapshotFound = true; - } - } - - if (!snapshotFound) { - if (Self->IsFollower()) { - SetStatusError( - Result->Record, - Ydb::StatusIds::UNSUPPORTED, - TStringBuilder() << "Table id " << tableId - << " reading from snapshot " - << state.ReadVersion - << " is not supported on follower shard " << Self->TabletID()); - return; - } - - bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); - if (!isMvccReadable) { - SetStatusError( - Result->Record, - Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Table id " << tableId << " has no snapshot at " - << state.ReadVersion << " shard " << Self->TabletID() - << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() - << (Self->IsFollower() ? " RO replica" : "")); - return; - } - - bool isRepeatable = state.IsHeadRead ? false : true; - SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId), isRepeatable); - } - } + Y_VERIFY(!userTableInfo->IsBackup); + Y_VERIFY(Self->IsMvccEnabled()); state.SchemaVersion = userTableInfo->GetTableSchemaVersion(); if (record.GetTableId().HasSchemaVersion()) { @@ -1317,7 +1262,6 @@ public: userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now(); } else { // DS is owner, read system table - auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId); if (!schema) { SetStatusError( @@ -1548,7 +1492,7 @@ private: } if (!snapshotFound) { - bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); if (!isMvccReadable) { SetStatusError( Result->Record, @@ -1730,6 +1674,9 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB TEvDataShard::TEvRead::TPtr Ev; TReadIteratorId ReadId; + // When we need to reply with an error + std::unique_ptr<TEvDataShard::TEvReadResult> Reply; + TOperation::TPtr Op; TVector<EExecutionUnitKind> CompleteList; bool WaitComplete = false; @@ -1747,8 +1694,8 @@ public: LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline execute" << ": at tablet# " << Self->TabletID()); - auto it = Self->ReadIterators.find(ReadId); - if (it == Self->ReadIterators.end() && !Op) { + auto readIt = Self->ReadIterators.find(ReadId); + if (readIt == Self->ReadIterators.end() && !Op) { // iterator aborted before we could start operation return true; } @@ -1766,28 +1713,189 @@ public: if (status != NKikimrTxDataShard::TError::OK) { Y_VERIFY_DEBUG(!Op); - if (Y_UNLIKELY(it == Self->ReadIterators.end())) { + if (Y_UNLIKELY(readIt == Self->ReadIterators.end())) { // iterator already aborted return true; } - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); - SetStatusError( - result->Record, + ReplyError( Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Failed to sync follower: " << errMessage); - result->Record.SetReadId(ReadId.ReadId); - SendViaSession(it->second->SessionId, ReadId.Sender, Self->SelfId(), result.release()); - return true; } } if (Ev) { + // We must perform some initialization in transaction (e.g. after a follower sync), but before the operation is built + Y_VERIFY(readIt != Self->ReadIterators.end()); + Y_VERIFY(readIt->second); + auto& state = *readIt->second; + auto* request = Ev->Get(); + const auto& record = request->Record; + + Y_VERIFY(state.State == TReadIteratorState::EState::Init); + + bool setUsingSnapshotFlag = false; + + // We assume that owner is schemeshard and it's a user table + if (state.PathId.OwnerId != Self->TabletID()) { + if (state.PathId.OwnerId != Self->GetPathOwnerId()) { + ReplyError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId + << ", tableId: " << state.PathId.LocalPathId + << ", from wrong owner: " << Self->GetPathOwnerId()); + return true; + } + + const auto tableId = state.PathId.LocalPathId; + auto it = Self->TableInfos.find(tableId); + if (it == Self->TableInfos.end()) { + ReplyError( + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Unknown table id: " << tableId); + return true; + } + + auto& userTableInfo = it->second; + if (userTableInfo->IsBackup) { + ReplyError( + Ydb::StatusIds::BAD_REQUEST, + "Can't read from a backup table"); + return true; + } + + if (!Self->IsMvccEnabled()) { + ReplyError( + Ydb::StatusIds::UNSUPPORTED, + "Cannot use read iterators without mvcc"); + return true; + } + + if (state.IsHeadRead) { + // We want to try and choose a more specific non-repeatable snapshot + if (Self->IsFollower()) { + auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge(); + // Note: during transition follower edge may be unitialized or lag behind + // We assume we can use it when it's not before low watermark + auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev(); + if (maxRepeatable >= Self->GetSnapshotManager().GetLowWatermark()) { + state.ReadVersion = followerEdge; + state.IsHeadRead = !followerRepeatable; + } + } else { + state.ReadVersion = Self->GetMvccTxVersion(EMvccTxMode::ReadOnly); + } + if (!state.ReadVersion.IsMax()) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, + Self->TabletID() << " changed HEAD read to " + << (state.IsHeadRead ? "non-repeatable" : "repeatable") + << " " << state.ReadVersion); + } + } else { + bool snapshotFound = false; + + const ui64 ownerId = state.PathId.OwnerId; + TSnapshotKey snapshotKey( + ownerId, + tableId, + state.ReadVersion.Step, + state.ReadVersion.TxId); + + if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) { + // TODO: do we need to acquire? + setUsingSnapshotFlag = true; + snapshotFound = true; + } + + if (!snapshotFound) { + bool snapshotUnavailable = false; + + if (state.ReadVersion < Self->GetSnapshotManager().GetLowWatermark() || state.ReadVersion.Step == Max<ui64>()) { + snapshotUnavailable = true; + } + + if (Self->IsFollower()) { + auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge(); + auto maxRepeatable = !followerEdge || followerRepeatable ? followerEdge : followerEdge.Prev(); + if (state.ReadVersion > maxRepeatable) { + snapshotUnavailable = true; + } + } else { + auto prioritizedMvccSnapshotReads = Self->GetEnablePrioritizedMvccSnapshotReads(); + TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads); + if (state.ReadVersion >= unreadableEdge) { + LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId); + Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(Ev), ctx); + Self->DeleteReadIterator(readIt); + return true; + } + } + + if (snapshotUnavailable) { + ReplyError( + Ydb::StatusIds::PRECONDITION_FAILED, + TStringBuilder() << "Table id " << tableId << " has no snapshot at " + << state.ReadVersion << " shard " << Self->TabletID() + << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() + << (Self->IsFollower() ? " RO replica" : "")); + return true; + } + } + } + } else { + // Handle system table reads + if (Self->IsFollower()) { + ReplyError( + Ydb::StatusIds::UNSUPPORTED, + "Followers don't support system table reads"); + return true; + } + if (!state.IsHeadRead) { + ReplyError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion); + return true; + } + if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) { + ReplyError( + Ydb::StatusIds::BAD_REQUEST, + "Cannot read from user tables using system tables"); + return true; + } + if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) { + ReplyError( + Ydb::StatusIds::UNSUPPORTED, + TStringBuilder() << "Unsupported result format " + << (int)record.GetResultFormat() << " when reading from system tables"); + return true; + } + if (record.GetTableId().HasSchemaVersion()) { + ReplyError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Cannot request system table at shard " << record.GetTableId().GetOwnerId() + << ", localTid: " << record.GetTableId().GetTableId() + << ", with schema: " << record.GetTableId().GetSchemaVersion()); + return true; + } + + // We don't want this read to interact with other operations + setUsingSnapshotFlag = true; + } + const ui64 tieBreaker = Self->NextTieBreakerIndex++; Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev); Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); + if (!state.ReadVersion.IsMax()) { + Op->SetMvccSnapshot( + TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId), + /* repeatable = */ state.IsHeadRead ? false : true); + } + if (setUsingSnapshotFlag) { + Op->SetUsingSnapshotFlag(); + } + Ev = nullptr; Op->IncrementInProgress(); } @@ -1845,10 +1953,28 @@ public: } } + void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message) { + Reply = std::make_unique<TEvDataShard::TEvReadResult>(); + SetStatusError(Reply->Record, code, message); + Reply->Record.SetReadId(ReadId.ReadId); + } + void Complete(const TActorContext& ctx) override { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete" << ": at tablet# " << Self->TabletID()); + if (Reply) { + Y_VERIFY(!Op); + auto it = Self->ReadIterators.find(ReadId); + if (it != Self->ReadIterators.end()) { + Y_VERIFY(it->second); + auto& state = *it->second; + SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), Reply.release()); + Self->DeleteReadIterator(it); + } + return; + } + if (!Op) return; @@ -1967,7 +2093,7 @@ public: } if (!snapshotFound) { - bool isMvccReadable = !Self->IsFollower() && state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool isMvccReadable = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); if (!isMvccReadable) { SetStatusError( Result->Record, @@ -2216,6 +2342,14 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } + if (Pipeline.HasProposeDelayers()) { + LWTRACK(ReadWaitProposeDelayers, request->Orbit); + Pipeline.RegisterWaitingReadIterator(readId, request); + DelayedProposeQueue.emplace_back().Reset(ev.Release()); + UpdateProposeQueueSize(); + return; + } + if (Pipeline.HasDrop()) { replyWithError( Ydb::StatusIds::INTERNAL_ERROR, @@ -2249,123 +2383,24 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } TRowVersion readVersion = TRowVersion::Max(); - bool isHeadRead = false; + bool isHeadRead = true; if (record.HasSnapshot()) { readVersion.Step = record.GetSnapshot().GetStep(); readVersion.TxId = record.GetSnapshot().GetTxId(); - if (readVersion.IsMax()) { + if (readVersion.Step == Max<ui64>()) { replyWithError( Ydb::StatusIds::UNSUPPORTED, "invalid snapshot value specified"); return; } + isHeadRead = false; } - if (!IsFollower()) { - if (record.GetTableId().GetOwnerId() != TabletID()) { - // owner is schemeshard, read user table - if (readVersion.IsMax()) { - // transform a HEAD read into some non-repeatable snapshot - readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr); - ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step); - ev->Get()->Record.MutableSnapshot()->SetTxId(readVersion.TxId); - isHeadRead = true; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed HEAD read into non-repeatable " << readVersion); - } - - TSnapshotKey snapshotKey( - record.GetTableId().GetOwnerId(), - record.GetTableId().GetTableId(), - readVersion.Step, - readVersion.TxId); - - bool snapshotFound = GetSnapshotManager().FindAvailable(snapshotKey); - if (!snapshotFound) { - // check if there is MVCC version and maybe wait - if (readVersion < GetSnapshotManager().GetLowWatermark()) { - replyWithError( - Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "MVCC read " << readVersion - << " bellow low watermark " << GetSnapshotManager().GetLowWatermark()); - return; - } - - // MVCC read is possible, but nead to check MVCC state and if we need to wait - - if (MvccSwitchState == TSwitchState::SWITCHING) { - Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx); - return; - } - - auto prioritizedMvccSnapshotReads = GetEnablePrioritizedMvccSnapshotReads(); - TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads); - if (readVersion >= unreadableEdge) { - LWTRACK(ReadWaitSnapshot, request->Orbit, readVersion.Step, readVersion.TxId); - Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx); - return; - } - - // we found proper MVCC snapshot - snapshotFound = true; - } - - if (!snapshotFound) { - replyWithError( - Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Neither regular nor MVCC snapshot for " << readVersion); - return; - } - } else { - // DS is owner, read system table - if (!readVersion.IsMax()) { - replyWithError( - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Only HEAD read from sys tables is allowed"); - return; - } - - if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) { - replyWithError( - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Only sys tables can be read by localTid, table " - << record.GetTableId().GetTableId()); - return; - } - - if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) { - replyWithError( - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Sys tables can be read only in cellvec format, but requested " - << (int)NKikimrTxDataShard::CELLVEC); - return; - } - - if (record.GetTableId().HasSchemaVersion()) { - replyWithError( - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Requesting system stable owned " << record.GetTableId().GetOwnerId() - << ", localTid: " << record.GetTableId().GetTableId() - << ", with schema: " << record.GetTableId().GetSchemaVersion()); - return; - } - } - } else { - if (record.GetTableId().GetOwnerId() == TabletID()) { - replyWithError( - Ydb::StatusIds::UNSUPPORTED, - "Systable reads on followers are not supported"); - return; - } - - // follower: we can't check snapshot version, because need to sync and to sync - // we need transaction - if (readVersion.IsMax()) { - isHeadRead = true; - } + if (MvccSwitchState == TSwitchState::SWITCHING) { + Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx); + return; } - // Note: iterator is correct and ready to execute - TActorId sessionId; if (readId.Sender.NodeId() != SelfId().NodeId()) { Y_VERIFY_DEBUG(ev->InterconnectSession); @@ -2386,7 +2421,10 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct ReadIterators.emplace( readId, - new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit))); + new TReadIteratorState( + readId, TPathId(record.GetTableId().GetOwnerId(), record.GetTableId().GetTableId()), + sessionId, readVersion, isHeadRead, + AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit))); SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index ee36aef45a7..1f1b57ab246 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -381,6 +381,11 @@ public: if (completeEdge) { Self->PromoteCompleteEdge(completeEdge, txc); + // NOTE: asynchronous indexes are inconsistent by their nature + // We just promote follower read edge to the latest version + // and say it's repeatable, even though we cannot possibly know + // which versions are consistent and repeatable. + Self->GetSnapshotManager().PromoteFollowerReadEdge(completeEdge, true, txc); } return true; diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index 3ac4eb40b4d..2fa1b814409 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -140,6 +140,9 @@ void TDependencyTracker::FlushImmediateWrites() noexcept { } const TDependencyTracker::TDependencyTrackingLogic& TDependencyTracker::GetTrackingLogic() const noexcept { + if (Self->IsFollower()) + return FollowerLogic; + if (Self->IsMvccEnabled()) return MvccLogic; @@ -1074,5 +1077,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::RemoveOperation(const TOp } } +void TDependencyTracker::TFollowerDependencyTrackingLogic::AddOperation(const TOperation::TPtr&) const noexcept { + // all follower operations are readonly and don't conflict +} + +void TDependencyTracker::TFollowerDependencyTrackingLogic::RemoveOperation(const TOperation::TPtr&) const noexcept { + // all follower operations are readonly and don't conflict +} + } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h index d6fab6ee5c6..4c5a3746b02 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.h +++ b/ydb/core/tx/datashard/datashard_dep_tracker.h @@ -81,11 +81,17 @@ private: void RemoveOperation(const TOperation::TPtr& op) const noexcept override; }; + struct TFollowerDependencyTrackingLogic : public TDependencyTrackingLogic { + explicit TFollowerDependencyTrackingLogic(TDependencyTracker& parent) + : TDependencyTrackingLogic(parent) {} + + void AddOperation(const TOperation::TPtr& op) const noexcept override; + void RemoveOperation(const TOperation::TPtr& op) const noexcept override; + }; + public: TDependencyTracker(TDataShard* self) : Self(self) - , DefaultLogic(*this) - , MvccLogic(*this) { } public: @@ -130,7 +136,7 @@ private: const TDependencyTrackingLogic& GetTrackingLogic() const noexcept; private: - TDataShard* Self; + TDataShard* const Self; // Temporary vectors for building dependencies TKeys TmpRead; TKeys TmpWrite; @@ -158,8 +164,9 @@ private: TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedPlannedWrites; TIntrusiveList<TOperation, TOperationDelayedWriteListTag> DelayedImmediateWrites; - const TDefaultDependencyTrackingLogic DefaultLogic; - const TMvccDependencyTrackingLogic MvccLogic; + const TDefaultDependencyTrackingLogic DefaultLogic{ *this }; + const TMvccDependencyTrackingLogic MvccLogic{ *this }; + const TFollowerDependencyTrackingLogic FollowerLogic{ *this }; }; } // namespace NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b4b580226da..1940eb769b5 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -224,6 +224,7 @@ class TDataShard class TTxVolatileTxAbort; class TTxCdcStreamScanRun; class TTxCdcStreamScanProgress; + class TTxUpdateFollowerReadEdge; template <typename T> friend class TTxDirectBase; class TTxUploadRows; @@ -1060,6 +1061,14 @@ class TDataShard Sys_LastLoanTableTid, // 41 Last tid that we used in LoanTable + // The last step:txId that is unconditionally readable on followers + // without producing possibly inconsistent results. When repeatable + // is set leader will also never add new writes to this edge, making + // it possible to use the edge as a local snapshot. + SysMvcc_FollowerReadEdgeStep = 42, + SysMvcc_FollowerReadEdgeTxId = 43, + SysMvcc_FollowerReadEdgeRepeatable = 44, + // reserved SysPipeline_Flags = 1000, SysPipeline_LimitActiveTx, @@ -1799,6 +1808,13 @@ public: // Returns true when datashard is working in mvcc mode bool IsMvccEnabled() const; + // Calculates current follower read edge + std::tuple<TRowVersion, bool, ui64> CalculateFollowerReadEdge() const; + + // Promotes current follower read edge + bool PromoteFollowerReadEdge(TTransactionContext& txc); + bool PromoteFollowerReadEdge(); + // Returns a suitable row version for performing a transaction TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const; @@ -2234,6 +2250,7 @@ private: // For follower only struct TFollowerState { + ui64 LastSysUpdate = 0; ui64 LastSchemeUpdate = 0; ui64 LastSnapshotsUpdate = 0; }; @@ -2340,7 +2357,7 @@ private: }; TProposeQueue ProposeQueue; - TVector<THolder<TEvDataShard::TEvProposeTransaction::THandle>> DelayedProposeQueue; + TVector<THolder<IEventHandle>> DelayedProposeQueue; TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig; THolder<NTabletPipe::IClientCache> PipeClientCache; @@ -2658,6 +2675,8 @@ private: NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver; + bool UpdateFollowerReadEdgePending = false; + public: auto& GetLockChangeRecords() { return LockChangeRecords; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 94e4646ed12..8a6e822d019 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -11,8 +11,6 @@ namespace NKikimr { namespace NDataShard { -#define LOAD_SYS_UI64(db, row, value) if (!TDataShard::SysGetUi64(db, row, value)) return false; - TPipeline::TPipeline(TDataShard * self) : Self(self) , DepTracker(self) @@ -45,14 +43,19 @@ bool TPipeline::Load(NIceDb::TNiceDb& db) { using Schema = TDataShard::Schema; Y_VERIFY(!SchemaTx); - LOAD_SYS_UI64(db, Schema::Sys_LastPlannedStep, LastPlannedTx.Step); - LOAD_SYS_UI64(db, Schema::Sys_LastPlannedTx, LastPlannedTx.TxId); - LOAD_SYS_UI64(db, Schema::Sys_LastCompleteStep, LastCompleteTx.Step); - LOAD_SYS_UI64(db, Schema::Sys_LastCompleteTx, LastCompleteTx.TxId); - LOAD_SYS_UI64(db, Schema::Sys_AliveStep, KeepSchemaStep); - LOAD_SYS_UI64(db, Schema::SysPipeline_Flags, Config.Flags); - LOAD_SYS_UI64(db, Schema::SysPipeline_LimitActiveTx, Config.LimitActiveTx); - LOAD_SYS_UI64(db, Schema::SysPipeline_LimitDataTxCache, Config.LimitDataTxCache); + + bool ready = true; + ready &= Self->SysGetUi64(db, Schema::Sys_LastPlannedStep, LastPlannedTx.Step); + ready &= Self->SysGetUi64(db, Schema::Sys_LastPlannedTx, LastPlannedTx.TxId); + ready &= Self->SysGetUi64(db, Schema::Sys_LastCompleteStep, LastCompleteTx.Step); + ready &= Self->SysGetUi64(db, Schema::Sys_LastCompleteTx, LastCompleteTx.TxId); + ready &= Self->SysGetUi64(db, Schema::Sys_AliveStep, KeepSchemaStep); + ready &= Self->SysGetUi64(db, Schema::SysPipeline_Flags, Config.Flags); + ready &= Self->SysGetUi64(db, Schema::SysPipeline_LimitActiveTx, Config.LimitActiveTx); + ready &= Self->SysGetUi64(db, Schema::SysPipeline_LimitDataTxCache, Config.LimitDataTxCache); + if (!ready) { + return false; + } Config.Validate(); UtmostCompleteTx = LastCompleteTx; @@ -941,6 +944,10 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc, Self->TransQueue.RemoveTx(db, *op); RemoveInReadSets(op, db); + if (Self->IsMvccEnabled()) { + Self->PromoteFollowerReadEdge(txc); + } + while (!DelayedAcks.empty() && DelayedAcks.begin()->first.Step <= OutdatedReadSetStep()) { @@ -1960,7 +1967,7 @@ void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) { } bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const { - if (!Self->IsMvccEnabled() || !op->IsMvccSnapshotRead() || op->HasWaitCompletionFlag()) + if (Self->IsFollower() || !Self->IsMvccEnabled() || !op->IsMvccSnapshotRead() || op->HasWaitCompletionFlag()) return true; // don't send errors early diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index 9b489f0f108..b328f4326dd 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -15,13 +15,31 @@ void TSnapshotManager::Reset() { IncompleteEdge = TRowVersion::Min(); CompleteEdge = TRowVersion::Min(); LowWatermark = TRowVersion::Min(); + ImmediateWriteEdge = TRowVersion::Min(); + ImmediateWriteEdgeReplied = TRowVersion::Min(); + UnprotectedReadEdge = TRowVersion::Min(); CommittedCompleteEdge = TRowVersion::Min(); + FollowerReadEdge = TRowVersion::Min(); + FollowerReadEdgeRepeatable = false; Snapshots.clear(); } bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { + bool ready = true; + + ready &= ReloadSys(db); + ready &= ReloadSnapshots(db); + + if (ready) { + Self->SetCounter(COUNTER_MVCC_ENABLED, IsMvccEnabled() ? 1 : 0); + } + + return ready; +} + +bool TSnapshotManager::ReloadSys(NIceDb::TNiceDb& db) { using Schema = TDataShard::Schema; bool ready = true; @@ -31,63 +49,33 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { TRowVersion incompleteEdge = TRowVersion::Min(); TRowVersion lowWatermark = TRowVersion::Min(); TRowVersion immediateWriteEdge = TRowVersion::Min(); + TRowVersion followerReadEdge = TRowVersion::Min(); ui32 mvccState = 0; ui64 keepSnapshotTimeout = 0; ui64 unprotectedReads = 0; - - TSnapshotMap snapshots; + ui64 followerReadEdgeRepeatable = 0; ready &= Self->SysGetUi64(db, Schema::Sys_MinWriteVersionStep, minWriteVersion.Step); ready &= Self->SysGetUi64(db, Schema::Sys_MinWriteVersionTxId, minWriteVersion.TxId); - if (!Self->IsFollower()) { - // We don't currently support mvcc on the follower - ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step); - ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId); - } - - { - auto rowset = db.Table<Schema::Snapshots>().Range().Select(); - if (rowset.IsReady()) { - while (!rowset.EndOfSet()) { - ui64 oid = rowset.GetValue<Schema::Snapshots::Oid>(); - ui64 tid = rowset.GetValue<Schema::Snapshots::Tid>(); - ui64 step = rowset.GetValue<Schema::Snapshots::Step>(); - ui64 txId = rowset.GetValue<Schema::Snapshots::TxId>(); - TString name = rowset.GetValue<Schema::Snapshots::Name>(); - ui64 flags = rowset.GetValue<Schema::Snapshots::Flags>(); - ui64 timeout_ms = rowset.GetValue<Schema::Snapshots::TimeoutMs>(); - - TSnapshotKey key(oid, tid, step, txId); - - auto res = snapshots.emplace( - std::piecewise_construct, - std::forward_as_tuple(key), - std::forward_as_tuple(key, std::move(name), flags, TDuration::MilliSeconds(timeout_ms))); - Y_VERIFY_S(res.second, "Unexpected duplicate snapshot: " << key); - - if (!rowset.Next()) { - ready = false; - break; - } - } - } else { - ready = false; - } - } + ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeStep, followerReadEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeTxId, followerReadEdge.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, followerReadEdgeRepeatable); if (ready) { - // We have a consistent view of snapshots table, apply + // We have a consistent view of settings, apply MinWriteVersion = minWriteVersion; MvccState = static_cast<EMvccState>(mvccState); KeepSnapshotTimeout = keepSnapshotTimeout; @@ -105,14 +93,46 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { ImmediateWriteEdgeReplied.TxId = Max<ui64>(); } CommittedCompleteEdge = completeEdge; - Snapshots = std::move(snapshots); + FollowerReadEdge = followerReadEdge; + FollowerReadEdgeRepeatable = (followerReadEdgeRepeatable != 0); } - if (ready) { - Self->SetCounter(COUNTER_MVCC_ENABLED, IsMvccEnabled() ? 1 : 0); + return ready; +} + +bool TSnapshotManager::ReloadSnapshots(NIceDb::TNiceDb& db) { + using Schema = TDataShard::Schema; + + TSnapshotMap snapshots; + + auto rowset = db.Table<Schema::Snapshots>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + ui64 oid = rowset.GetValue<Schema::Snapshots::Oid>(); + ui64 tid = rowset.GetValue<Schema::Snapshots::Tid>(); + ui64 step = rowset.GetValue<Schema::Snapshots::Step>(); + ui64 txId = rowset.GetValue<Schema::Snapshots::TxId>(); + TString name = rowset.GetValue<Schema::Snapshots::Name>(); + ui64 flags = rowset.GetValue<Schema::Snapshots::Flags>(); + ui64 timeout_ms = rowset.GetValue<Schema::Snapshots::TimeoutMs>(); + + TSnapshotKey key(oid, tid, step, txId); + + auto res = snapshots.emplace( + std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(key, std::move(name), flags, TDuration::MilliSeconds(timeout_ms))); + Y_VERIFY_S(res.second, "Unexpected duplicate snapshot: " << key); + + if (!rowset.Next()) { + return false; + } } - return ready; + Snapshots = std::move(snapshots); + return true; } void TSnapshotManager::InitExpireQueue(TInstant now) { @@ -299,6 +319,33 @@ void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedRea }); } +std::pair<TRowVersion, bool> TSnapshotManager::GetFollowerReadEdge() const { + return { FollowerReadEdge, FollowerReadEdgeRepeatable }; +} + +bool TSnapshotManager::PromoteFollowerReadEdge(const TRowVersion& version, bool repeatable, TTransactionContext& txc) { + using Schema = TDataShard::Schema; + + if (IsMvccEnabled()) { + if (FollowerReadEdge < version) { + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeStep, version.Step); + Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeTxId, version.TxId); + Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, ui64(repeatable ? 1 : 0)); + FollowerReadEdge = version; + FollowerReadEdgeRepeatable = repeatable; + return true; + } else if (FollowerReadEdge == version && repeatable && !FollowerReadEdgeRepeatable) { + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_FollowerReadEdgeRepeatable, ui64(repeatable ? 1 : 0)); + FollowerReadEdgeRepeatable = repeatable; + return true; + } + } + + return false; +} + void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) { using Schema = TDataShard::Schema; @@ -696,6 +743,7 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext& if (CompleteEdge.Step < ImmediateWriteEdgeReplied.Step) { PromoteCompleteEdge(ImmediateWriteEdgeReplied.Step, txc); + Self->PromoteFollowerReadEdge(txc); } // Calculate the maximum version where we may have written something @@ -705,7 +753,13 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext& maxWriteVersion = ImmediateWriteEdge; } - removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion)); + // Make sure we don't leave followers without any repeatable read version + TRowVersion maxRepeatableRead = FollowerReadEdge; + if (maxRepeatableRead && !FollowerReadEdgeRepeatable) { + maxRepeatableRead = maxRepeatableRead.Prev(); + } + + removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion, maxRepeatableRead)); LastAdvanceWatermark = NActors::TActivationContext::Monotonic(); return removed; diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h index eb868fa4dbc..44e17632656 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.h +++ b/ydb/core/tx/datashard/datashard_snapshots.h @@ -109,6 +109,8 @@ public: void Reset(); bool Reload(NIceDb::TNiceDb& db); + bool ReloadSys(NIceDb::TNiceDb& db); + bool ReloadSnapshots(NIceDb::TNiceDb& db); void InitExpireQueue(TInstant now); @@ -144,6 +146,9 @@ public: bool IsPerformedUnprotectedReadsCommitted() const; void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc); + std::pair<TRowVersion, bool> GetFollowerReadEdge() const; + bool PromoteFollowerReadEdge(const TRowVersion& version, bool repeatable, TTransactionContext& txc); + EMvccState GetMvccState() const { return MvccState; } @@ -228,6 +233,8 @@ private: TRowVersion UnprotectedReadEdge = TRowVersion::Min(); TRowVersion CommittedCompleteEdge = TRowVersion::Min(); + TRowVersion FollowerReadEdge = TRowVersion::Min(); + bool FollowerReadEdgeRepeatable = false; TSnapshotMap Snapshots; diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index 41dd9f45c4f..2d33d53ee3e 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -281,6 +281,10 @@ public: kv.second.OptimizeSplitKeys(rdb); } + if (mvcc) { + Self->PromoteFollowerReadEdge(txc); + } + Self->State = TShardState::Ready; Self->PersistSys(db, Schema::Sys_State, Self->State); } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index f2938f5ad2a..829c2589301 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -2312,7 +2312,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { TestReadKey(NKikimrTxDataShard::CELLVEC, true); } - Y_UNIT_TEST(ShouldNotReadMvccFromFollower) { + Y_UNIT_TEST(ShouldNotReadFutureMvccFromFollower) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -2326,7 +2326,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { AddKeyQuery(*request, {3, 3, 3}); auto readResult = helper.SendRead("table-1", request.release()); const auto& record = readResult->Record; - UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::PRECONDITION_FAILED); } Y_UNIT_TEST(ShouldReadHeadFromFollower) { @@ -2791,24 +2791,27 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { switch (event->GetTypeRewrite()) { case TEvMediatorTimecast::EvUpdate: { + auto* update = event->Get<TEvMediatorTimecast::TEvUpdate>(); + lastStep = update->Record.GetTimeBarrier(); + Cerr << "... observed TEvUpdate(" << lastStep << ")" << Endl; if (captureTimecast) { - auto update = event->Get<TEvMediatorTimecast::TEvUpdate>(); - lastStep = update->Record.GetTimeBarrier(); Cerr << "---- dropped EvUpdate ----" << Endl; return TTestActorRuntime::EEventAction::DROP; } break; } case TEvMediatorTimecast::EvWaitPlanStep: { + auto* waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); + Cerr << "... observed TEvWaitPlanStep(" << waitEvent->PlanStep << ")" << Endl; if (captureWaitNotify) { - auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); waitPlanStep = waitEvent->PlanStep; } break; } case TEvMediatorTimecast::EvNotifyPlanStep: { + auto* notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>(); + Cerr << "... observed TEvNotifyPlanStep(" << notifyEvent->PlanStep << ")" << Endl; if (captureWaitNotify) { - auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>(); notifyPlanStep = notifyEvent->PlanStep; } break; @@ -2835,7 +2838,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { captureWaitNotify = true; // future snapshot - snapshot = TRowVersion(lastStep + 1000, Max<ui64>()); + snapshot = TRowVersion(lastStep + 3000, Max<ui64>()); auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot); AddKeyQuery(*request1, {3, 3, 3}); @@ -2845,7 +2848,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto readResult1 = helper.SendRead("table-1", request1.release()); - waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep"); + waitFor([&]{ return notifyPlanStep >= snapshot.Step; }, TStringBuilder() << "intercepted TEvNotifyPlanStep for snapshot " << snapshot); UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step); UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step); @@ -2917,24 +2920,27 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { switch (event->GetTypeRewrite()) { case TEvMediatorTimecast::EvUpdate: { + auto* update = event->Get<TEvMediatorTimecast::TEvUpdate>(); + lastStep = update->Record.GetTimeBarrier(); + Cerr << "... observed TEvUpdate(" << lastStep << ")" << Endl; if (captureTimecast) { - auto update = event->Get<TEvMediatorTimecast::TEvUpdate>(); - lastStep = update->Record.GetTimeBarrier(); Cerr << "---- dropped EvUpdate ----" << Endl; return TTestActorRuntime::EEventAction::DROP; } break; } case TEvMediatorTimecast::EvWaitPlanStep: { + auto* waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); + Cerr << "... observed TEvWaitPlanStep(" << waitEvent->PlanStep << ")" << Endl; if (captureWaitNotify) { - auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); waitPlanStep = waitEvent->PlanStep; } break; } case TEvMediatorTimecast::EvNotifyPlanStep: { + auto* notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>(); + Cerr << "... observed TEvNotifyPlanStep(" << notifyEvent->PlanStep << ")" << Endl; if (captureWaitNotify) { - auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>(); notifyPlanStep = notifyEvent->PlanStep; } break; @@ -2965,7 +2971,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { captureWaitNotify = true; // future snapshot - snapshot = TRowVersion(lastStep + 1000, Max<ui64>()); + snapshot = TRowVersion(lastStep + 3000, Max<ui64>()); auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot); AddKeyQuery(*request1, {3, 3, 3}); @@ -2975,13 +2981,13 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.SendReadAsync("table-1", request1.release()); - waitFor([&]{ return waitPlanStep != 0; }, "intercepted TEvWaitPlanStep"); + waitFor([&]{ return waitPlanStep >= snapshot.Step; }, "intercepted TEvWaitPlanStep"); UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step); - UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, 0); + UNIT_ASSERT_LT(notifyPlanStep, snapshot.Step); helper.SendCancel("table-1", 1); - waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep"); + waitFor([&]{ return notifyPlanStep >= snapshot.Step; }, "intercepted TEvNotifyPlanStep"); UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step); UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step); @@ -3522,13 +3528,13 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { auto readResult = helper.SendRead("table-1", request.release()); const auto& record = readResult->Record; - UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::BAD_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); } }; Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { - NDataShard::TReadIteratorState state({}, false, {}); + NDataShard::TReadIteratorState state(TReadIteratorId({}, 0), TPathId(0, 0), {}, TRowVersion::Max(), true, {}); state.Quota.Rows = 100; state.Quota.Bytes = 1000; state.ConsumeSeqNo(10, 100); // seqno1 diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 908bfcd853d..0dbd327a8f5 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -81,6 +81,8 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, if (op->IsAborted()) { // Make sure we confirm aborts with a commit op->SetWaitCompletionFlag(true); + } else if (DataShard.IsFollower()) { + // It doesn't matter whether we wait or not } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) { auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc); diff --git a/ydb/core/tx/datashard/follower_edge.cpp b/ydb/core/tx/datashard/follower_edge.cpp new file mode 100644 index 00000000000..846c5784d2c --- /dev/null +++ b/ydb/core/tx/datashard/follower_edge.cpp @@ -0,0 +1,98 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +std::tuple<TRowVersion, bool, ui64> TDataShard::CalculateFollowerReadEdge() const { + Y_VERIFY(!IsFollower()); + Y_VERIFY_DEBUG(IsMvccEnabled()); + + for (auto order : TransQueue.GetPlan()) { + // When we have planned operations we assume the first one may be used + // for new writes, so we mark is as non-repeatable. We could skip + // readonly operations, but there's little benefit in that, and it's + // complicated to determine which is the first readable given we may + // have executed some out of order. + return { TRowVersion(order.Step, order.TxId), false, 0 }; + } + + // This is the max version where we had any writes + TRowVersion maxWrite(SnapshotManager.GetCompleteEdge().Step, Max<ui64>()); + if (maxWrite < SnapshotManager.GetImmediateWriteEdge()) { + maxWrite = SnapshotManager.GetImmediateWriteEdge(); + } + + // This is the next version that would be used for new writes + TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite); + + if (maxWrite < nextWrite) { + return { maxWrite, true, 0 }; + } + + TRowVersion maxObserved(GetMaxObservedStep(), Max<ui64>()); + if (maxObserved < maxWrite) { + return { maxObserved, true, maxWrite.Step }; + } + + return { maxWrite, false, maxWrite.Next().Step }; +} + +bool TDataShard::PromoteFollowerReadEdge(TTransactionContext& txc) { + Y_VERIFY(!IsFollower()); + + if (IsMvccEnabled()) { + auto [version, repeatable, waitStep] = CalculateFollowerReadEdge(); + + if (waitStep) { + WaitPlanStep(waitStep); + } + + return SnapshotManager.PromoteFollowerReadEdge(version, repeatable, txc); + } + + return false; +} + +class TDataShard::TTxUpdateFollowerReadEdge + : public NTabletFlatExecutor::TTransactionBase<TDataShard> +{ +public: + TTxUpdateFollowerReadEdge(TDataShard* self) + : TBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_UPDATE_FOLLOWER_READ_EDGE; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_VERIFY(Self->UpdateFollowerReadEdgePending); + Self->UpdateFollowerReadEdgePending = false; + Self->PromoteFollowerReadEdge(txc); + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } +}; + +bool TDataShard::PromoteFollowerReadEdge() { + Y_VERIFY(!IsFollower()); + + if (IsMvccEnabled()) { + auto [currentEdge, currentRepeatable] = SnapshotManager.GetFollowerReadEdge(); + auto [nextEdge, nextRepeatable, waitStep] = CalculateFollowerReadEdge(); + + if (currentEdge < nextEdge || currentEdge == nextEdge && !currentRepeatable && nextRepeatable) { + if (!UpdateFollowerReadEdgePending) { + UpdateFollowerReadEdgePending = true; + Execute(new TTxUpdateFollowerReadEdge(this)); + } + return true; + } else if (waitStep) { + WaitPlanStep(waitStep); + } + } + + return false; +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h index e63f21a9016..b7d96b79bcb 100644 --- a/ydb/core/tx/datashard/probes.h +++ b/ydb/core/tx/datashard/probes.h @@ -51,6 +51,10 @@ GROUPS("DataShard"), \ TYPES(), \ NAMES()) \ + PROBE(ReadWaitProposeDelayers, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ PROBE(ReadWaitSnapshot, \ GROUPS("DataShard"), \ TYPES(ui64, ui64), \ diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index eb87c540c29..f57dd1ac839 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -64,8 +64,14 @@ struct TReadIteratorState { }; public: - TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts, NLWTrace::TOrbit&& orbit = {}) - : IsHeadRead(isHeadRead) + TReadIteratorState( + const TReadIteratorId& readId, const TPathId& pathId, + const TActorId& sessionId, const TRowVersion& readVersion, bool isHeadRead, + TMonotonic ts, NLWTrace::TOrbit&& orbit = {}) + : ReadId(readId.ReadId) + , PathId(pathId) + , ReadVersion(readVersion) + , IsHeadRead(isHeadRead) , SessionId(sessionId) , StartTs(ts) , Orbit(std::move(orbit)) @@ -154,11 +160,11 @@ public: // Data from original request // - ui64 ReadId = 0; + ui64 ReadId; TPathId PathId; std::vector<NTable::TTag> Columns; - TRowVersion ReadVersion = TRowVersion::Max(); - bool IsHeadRead = false; + TRowVersion ReadVersion; + bool IsHeadRead; ui64 LockId = 0; ui32 LockNodeId = 0; TLockInfo::TPtr Lock; |