diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-08-22 12:12:00 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-08-22 12:12:00 +0300 |
commit | 90ba2f22fde909f48fae87f8d00773acc6f793a8 (patch) | |
tree | 3f4f77d232818413e9beaf9086c44c7437e9475f | |
parent | 80386d4cd0c19b3d6930768677e46f2d25536bcc (diff) | |
download | ydb-90ba2f22fde909f48fae87f8d00773acc6f793a8.tar.gz |
rewrite TEvRead to pipeline: properly handle dependencies, convert HEAD to MVCC, properly set locks
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/check_read_unit.cpp | 38 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 52 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 1077 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__stats.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_read_operation.h | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 456 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit_ctors.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit_kind.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_op_unit.cpp | 48 |
17 files changed, 1395 insertions, 350 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 03c967165c4..f6690afa4b9 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -75,6 +75,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp @@ -186,6 +187,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp diff --git a/ydb/core/tx/datashard/check_read_unit.cpp b/ydb/core/tx/datashard/check_read_unit.cpp new file mode 100644 index 00000000000..28734b52205 --- /dev/null +++ b/ydb/core/tx/datashard/check_read_unit.cpp @@ -0,0 +1,38 @@ +#include "datashard_read_operation.h" +#include "datashard_pipeline.h" +#include "execution_unit_ctors.h" + +namespace NKikimr::NDataShard { + +class TCheckReadUnit : public TExecutionUnit { +public: + TCheckReadUnit(TDataShard& self, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::CheckRead, true, self, pipeline) + { + } + + ~TCheckReadUnit() = default; + + bool IsReadyToExecute(TOperation::TPtr op) const override { + return !op->HasRuntimeConflicts(); + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { + IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get()); + Y_VERIFY(readOperation != nullptr); + + readOperation->CheckRequestAndInit(txc, ctx); + return EExecutionStatus::Executed; + } + + void Complete(TOperation::TPtr, const TActorContext&) override { + // CheckRequestAndInit either already failed request and replied to user + // or prepared operation for further execution + } +}; + +THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard& self, TPipeline& pipeline) { + return THolder(new TCheckReadUnit(self, pipeline)); +} + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ee76ffda77d..3540e87017a 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1641,36 +1641,54 @@ void TDataShard::SendImmediateWriteResult( } } -void TDataShard::SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie) { +void TDataShard::SendImmediateReadResult( + TMonotonic readTime, + const TActorId& target, + IEventBase* event, + ui64 cookie, + const TActorId& sessionId) +{ if (IsFollower() || !ReadOnlyLeaseEnabled()) { // We just send possibly stale result (old behavior) - Send(target, event, 0, cookie); + if (!sessionId) { + Send(target, event, 0, cookie); + } else { + SendViaSession(sessionId, target, SelfId(), event); + } return; } struct TSendState : public TThrRefBase { - TActorId Target; - THolder<IEventBase> Event; - ui64 Cookie; - - TSendState(const TActorId& target, IEventBase* event, ui64 cookie) - : Target(target) - , Event(event) - , Cookie(cookie) - { } + THolder<IEventHandle> Ev; + + TSendState(const TActorId& sessionId, const TActorId& target, const TActorId& src, IEventBase* event, ui64 cookie) + { + const ui32 flags = 0; + Ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie); + + if (sessionId) { + Ev->Rewrite(TEvInterconnect::EvForward, sessionId); + } + } }; if (!readTime) { readTime = AppData()->MonotonicTimeProvider->Now(); } - Executor()->ConfirmReadOnlyLease(readTime, [this, state = MakeIntrusive<TSendState>(target, event, cookie)] { - Send(state->Target, state->Event.Release(), 0, state->Cookie); + Executor()->ConfirmReadOnlyLease(readTime, + [state = MakeIntrusive<TSendState>(sessionId, target, SelfId(), event, cookie)] { + TActivationContext::Send(state->Ev.Release()); }); } -void TDataShard::SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie) { - SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie); +void TDataShard::SendImmediateReadResult( + const TActorId& target, + IEventBase* event, + ui64 cookie, + const TActorId& sessionId) +{ + SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie, sessionId); } void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) { @@ -1931,7 +1949,9 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, rejectReasons.push_back("decided to reject due to given RejectProbability"); } - size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting()); + size_t totalInFly = + ReadIteratorsInFly() + TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + + ProposeQueue.Size() + TxWaiting(); if (totalInFly > GetMaxTxInFly()) { reject = true; rejectReasons.push_back("MaxTxInFly was exceeded"); diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index b0202b57d97..0efe5a5ab06 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_read_operation.h" #include <ydb/core/formats/arrow_batch_builder.h> @@ -219,6 +220,8 @@ class TReader { ui64 BytesInResult = 0; + bool InvisibleRowSkipsMet = false; + NHPTimer::STime StartTime; NHPTimer::STime EndTime; @@ -243,7 +246,12 @@ public: EndTime = StartTime; } - EReadStatus ReadRange(TTransactionContext& txc, const TActorContext& ctx, const TSerializedTableRange& range) { + EReadStatus ReadRange( + TTransactionContext& txc, + const TActorContext& ctx, + const TSerializedTableRange& range, + size_t index) + { bool fromInclusive; TSerializedCellVec keyFromCells; if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) { @@ -251,7 +259,11 @@ public: keyFromCells = TSerializedCellVec(State.LastProcessedKey); } else { fromInclusive = range.FromInclusive; - keyFromCells = TSerializedCellVec(range.From); + if (index < State.Request->Ranges.size() && range.From.GetCells().size() != TableInfo.KeyColumnCount) { + keyFromCells = State.FromKeys[index]; + } else { + keyFromCells = TSerializedCellVec(range.From); + } } const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive); @@ -290,15 +302,20 @@ public: return result; } - EReadStatus ReadKey(TTransactionContext& txc, const TActorContext& ctx, const TSerializedCellVec& keyCells) { + EReadStatus ReadKey( + TTransactionContext& txc, + const TActorContext& ctx, + const TSerializedCellVec& keyCells, + size_t keyIndex) + { if (keyCells.GetCells().size() != TableInfo.KeyColumnCount) { // key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf] TSerializedTableRange range; - range.From = keyCells; + range.From = State.Keys[keyIndex]; range.To = keyCells; range.ToInclusive = true; range.FromInclusive = true; - return ReadRange(txc, ctx, range); + return ReadRange(txc, ctx, range, State.Request->Ranges.size()); } if (ColumnTypes.empty()) { @@ -316,6 +333,7 @@ public: NTable::TSelectStats stats; auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion); RowsSinceLastCheck += 1 + stats.InvisibleRowSkips; + InvisibleRowSkipsMet |= stats.InvisibleRowSkips > 0; if (ready == NTable::EReady::Page) { return EReadStatus::NeedData; } @@ -342,7 +360,7 @@ public: return true; const auto& range = State.Request->Ranges[FirstUnprocessedQuery]; - auto status = ReadRange(txc, ctx, range); + auto status = ReadRange(txc, ctx, range, FirstUnprocessedQuery); switch (status) { case EReadStatus::Done: continue; @@ -362,7 +380,7 @@ public: return true; const auto& key = State.Request->Keys[FirstUnprocessedQuery]; - auto status = ReadKey(txc, ctx, key); + auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery); switch (status) { case EReadStatus::Done: continue; @@ -452,6 +470,9 @@ public: record.SetReadId(State.ReadId); record.SetSeqNo(State.SeqNo + 1); + record.MutableSnapshot()->SetStep(State.ReadVersion.Step); + record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId); + NKikimrTxDataShard::TReadContinuationToken continuationToken; continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery); @@ -471,6 +492,7 @@ public: } ui64 GetRowsRead() const { return RowsRead; } + bool HasInvisibleRowSkips() const { return InvisibleRowSkipsMet; } private: bool OutOfQuota() const { @@ -528,6 +550,7 @@ private: BlockBuilder.AddRow(TDbTupleRef(), rowValues); ++RowsRead; + InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0; RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats); if (ShouldStop()) { return EReadStatus::StoppedByLimit; @@ -536,6 +559,7 @@ private: // last iteration to Page or Gone also might have deleted or invisible rows RowsSinceLastCheck += ResetRowStats(iter->Stats); + InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0; // TODO: consider restart when Page and too few data read // (how much is too few, less than user's limit?) @@ -556,25 +580,49 @@ const NHPTimer::STime TReader::MaxCyclesPerIteration = } // namespace -class TDataShard::TTxRead : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +class TDataShard::TReadOperation : public TOperation, public IReadOperation { + TDataShard* Self; TActorId Sender; std::shared_ptr<TEvDataShard::TEvRead> Request; + NMiniKQL::IEngineFlat::TValidationInfo ValidationInfo; + + size_t ExecuteCount = 0; + bool ResultSent = false; + std::unique_ptr<TEvDataShard::TEvReadResult> Result; - // on each Execute() set by CheckRequestAndInit std::unique_ptr<IBlockBuilder> BlockBuilder; TShortTableInfo TableInfo; std::unique_ptr<TReader> Reader; + static constexpr ui32 Flags = NTxDataShard::TTxFlags::ReadOnly | NTxDataShard::TTxFlags::Immediate; + public: - TTxRead(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev) - : TBase(ds) + TReadOperation(TDataShard* ds, ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev) + : TOperation(TBasicOpInfo(txId, EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex)) + , Self(ds) , Sender(ev->Sender) , Request(ev->Release().Release()) {} - TTxType GetTxType() const override { return TXTYPE_READ; } + void BuildExecutionPlan(bool loaded) override + { + Y_VERIFY(GetExecutionPlan().empty()); + Y_VERIFY(!loaded); + + TVector<EExecutionUnitKind> plan; + plan.push_back(EExecutionUnitKind::CheckRead); + plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); + plan.push_back(EExecutionUnitKind::ExecuteRead); + plan.push_back(EExecutionUnitKind::CompletedOperations); + + RewriteExecutionPlan(plan); + } + + const NMiniKQL::IEngineFlat::TValidationInfo& GetKeysInfo() const override { + return ValidationInfo; + } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { TReadIteratorId readId(Sender, Request->Record.GetReadId()); @@ -583,12 +631,13 @@ public: // iterator has been aborted return true; } - - Y_ASSERT(it->second); + Y_VERIFY(it->second); auto& state = *it->second; - state.State = TReadIteratorState::EState::Init; + Y_VERIFY(state.State == TReadIteratorState::EState::Executing); - Result.reset(new TEvDataShard::TEvReadResult()); + ++ExecuteCount; + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Execute read# " << ExecuteCount + << ", request: " << Request->Record); switch (Self->State) { case TShardState::Ready: @@ -637,122 +686,92 @@ public: return true; } - // TODO: in case of restart we recheck request and rebuild state, - // I believe it is needed because between restart schema can change, - // table can be deleted, version can gone, etc - bool finished = finished; - CheckRequestAndInit(txc, ctx, state, finished); - if (state.State != TReadIteratorState::EState::Executing) { - return finished; - } + // we need to check that scheme version is still correct, table presents and + // version is still available - Y_ASSERT(Reader); + if (state.PathId.OwnerId != Self->TabletID()) { + // owner is schemeshard, read user table + auto tableId = state.PathId.LocalPathId; + auto it = Self->TableInfos.find(tableId); + if (it == Self->TableInfos.end()) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Unknown table id: " << tableId); + return true; + } + auto& userTableInfo = it->second; - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read: " << Request->Record); + const ui64 ownerId = state.PathId.OwnerId; + TSnapshotKey snapshotKey( + ownerId, + tableId, + state.ReadVersion.Step, + state.ReadVersion.TxId); + + bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool allowMvcc = isMvccVersion && !Self->IsFollower(); + bool snapshotFound = false; + if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) { + // TODO: do we need to acquire? + snapshotFound = true; + } else if (allowMvcc) { + snapshotFound = true; + } + + if (!snapshotFound) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Table id " << tableId << " lost snapshot at " + << state.ReadVersion << " shard " << Self->TabletID() + << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() + << (Self->IsFollower() ? " RO replica" : "")); + return true; + } - bool res = Reader->Read(txc, ctx); - if (res && state.Request->Record.HasLockTxId()) { - // note that we set locks only when finish transaction, i.e. we have read something - // without page faults, etc and really finish transaction - auto lock = AcquireLock(ctx, state); - if (!lock) { - // TODO: clarify, when AcquireLock might fail and if we can do anything with it - SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Failed to set lock"); + if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::SCHEME_ERROR, + TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion() + << ", requested table schemaversion " << state.SchemaVersion); return true; } } - return res; + if (!Read(txc, ctx, state)) + return false; + + if (Request->Record.HasLockTxId()) { + // note that we set locks only when first read finish transaction, + // i.e. we have read something without page faults + AcquireLock(ctx, state); + } + + Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + return true; } - void Complete(const TActorContext& ctx) override { + void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override { TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { - // the one who removed the iterator should have reply to user - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " has been invalidated before TTxRead::Complete()"); + // iterator has been aborted + Abort(EExecutionUnitKind::CompletedOperations); return; } - Y_VERIFY(it->second); auto& state = *it->second; + Y_VERIFY(state.State == TReadIteratorState::EState::Init); - if (!Result) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxRead::Execute() finished without Result, aborting"); - Self->DeleteReadIterator(it); - - Result.reset(new TEvDataShard::TEvReadResult()); - SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - Result->Record.SetReadId(readId.ReadId); - SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); - return; - } - - // error happened and status set - auto& record = Result->Record; - if (record.HasStatus()) { - record.SetReadId(readId.ReadId); - record.SetSeqNo(state.SeqNo + 1); - SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxRead::Execute() finished with error, aborting: " << record.DebugString()); - Self->DeleteReadIterator(it); - return; - } - - Y_ASSERT(Reader); - Y_ASSERT(BlockBuilder); - - Reader->FillResult(*Result); - SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); - - // note that we save the state only when there're unread queries - if (Reader->HasUnreadQueries()) { - Y_ASSERT(it->second); - auto& state = *it->second; - Reader->UpdateState(state); - if (!state.IsExhausted()) { - ctx.Send( - Self->SelfId(), - new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId())); - } - } else { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " finished in read"); - Self->DeleteReadIterator(it); - } - } + Result.reset(new TEvDataShard::TEvReadResult()); -private: - void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state, bool& finished) { const auto& record = Request->Record; - if (!Request->Keys.empty() && !Request->Ranges.empty()) { - SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden"); - finished = true; - return; - } - - if (Request->Keys.empty() && Request->Ranges.empty()) { - SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Neither keys nor ranges"); - finished = true; - return; - } - - if (record.HasProgram()) { - SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "PushDown is not supported"); - finished = true; - return; - } - - if (record.ColumnsSize() == 0) { - SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing Columns"); - finished = true; - return; - } - state.ReadId = record.GetReadId(); state.PathId = TPathId( record.GetTableId().GetOwnerId(), @@ -777,47 +796,27 @@ private: state.Reverse = record.GetReverse(); - // note that we must call SyncSchemeOnFollower before we do any kind of checks - if (Self->IsFollower()) { - NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; - TString errMessage; - - if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { - finished = false; - return; - } - - if (status != NKikimrTxDataShard::TError::OK) { - SetStatusError( - Result->Record, - Ydb::StatusIds::INTERNAL_ERROR, - "Follower not ready"); - finished = true; - return; - } - } - if (state.PathId.OwnerId != Self->TabletID()) { // owner is schemeshard, read user table if (state.PathId.OwnerId != Self->GetPathOwnerId()) { - SetStatusError( - Result->Record, + SendErrorAndAbort( + ctx, + state, Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId << ", tableId: " << state.PathId.LocalPathId << ", from wrong owner: " << Self->GetPathOwnerId()); - finished = true; return; } - auto tableId = state.PathId.LocalPathId; + const auto tableId = state.PathId.LocalPathId; auto it = Self->TableInfos.find(tableId); if (it == Self->TableInfos.end()) { - SetStatusError( - Result->Record, + SendErrorAndAbort( + ctx, + state, Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId); - finished = true; + TStringBuilder() << "Unknown table id: " << tableId); return; } @@ -825,93 +824,303 @@ private: TableInfo = TShortTableInfo(userTableInfo); if (userTableInfo->IsBackup) { - SetStatusError( - Result->Record, + SendErrorAndAbort( + ctx, + state, Ydb::StatusIds::BAD_REQUEST, "Can't read from a backup table"); - finished = true; return; } - if (!state.ReadVersion.IsMax()) { - ui64 ownerId = state.PathId.OwnerId; - TSnapshotKey snapshotKey( - ownerId, - tableId, - state.ReadVersion.Step, - state.ReadVersion.TxId); - - bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); - bool allowMvcc = isMvccVersion && !Self->IsFollower(); - if (!Self->GetSnapshotManager().FindAvailable(snapshotKey) && !allowMvcc) { - SetStatusError( - Result->Record, - Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Table id " << tableId << " has no snapshot at " - << state.ReadVersion << " shard " << Self->TabletID() - << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() - << (Self->IsFollower() ? " RO replica" : "")); - finished = true; - return; - } + // we must have chosen the version + Y_VERIFY(!state.ReadVersion.IsMax()); + + const ui64 ownerId = state.PathId.OwnerId; + TSnapshotKey snapshotKey( + ownerId, + tableId, + state.ReadVersion.Step, + state.ReadVersion.TxId); + + bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool allowMvcc = isMvccVersion && !Self->IsFollower(); + bool snapshotFound = false; + if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) { + // TODO: do we need to acquire? + snapshotFound = true; + SetUsingSnapshotFlag(); + } else if (allowMvcc) { + snapshotFound = true; + SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId)); + } + + if (!snapshotFound) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Table id " << tableId << " has no snapshot at " + << state.ReadVersion << " shard " << Self->TabletID() + << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() + << (Self->IsFollower() ? " RO replica" : "")); + return; } state.SchemaVersion = userTableInfo->GetTableSchemaVersion(); if (record.GetTableId().HasSchemaVersion()) { if (state.SchemaVersion != record.GetTableId().GetSchemaVersion()) { - SetStatusError( - Result->Record, + SendErrorAndAbort( + ctx, + state, Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Wrong schemaversion " << record.GetTableId().GetSchemaVersion() << " requested, table schemaversion " << state.SchemaVersion); - finished = true; return; } } + // TODO: remove later, when we sure that key prefix is properly + // interpreted same way everywhere: i.e. treated as 0 at the left and + // inf on the right. + // We really do weird transformations here, not sure if we can do better though + for (size_t i = 0; i < Request->Ranges.size(); ++i) { + const auto& key = Request->Ranges[i].From; + if (key.GetCells().size() == TableInfo.KeyColumnCount) + continue; + + if (state.FromKeys.size() != Request->Ranges.size()) { + state.FromKeys.resize(Request->Ranges.size()); + } + + // we can safely use cells referencing original Request->Keys[x], + // because request will live until the end + TVector<TCell> extendedCells; + extendedCells.reserve(TableInfo.KeyColumnCount); + for (const auto& cell: key.GetCells()) { + extendedCells.emplace_back(cell); + } + // extend with nulls + extendedCells.resize(TableInfo.KeyColumnCount, TCell()); + + // most evil part: serialize and then parse... + state.FromKeys[i] = TSerializedCellVec(TSerializedCellVec::Serialize(extendedCells)); + } + + // TODO: remove later, when we sure that key prefix is properly + // interpreted same way everywhere: i.e. treated as 0 at the left and + // inf on the right. + // We really do weird transformations here, not sure if we can do better though + for (size_t i = 0; i < Request->Keys.size(); ++i) { + const auto& key = Request->Keys[i]; + if (key.GetCells().size() == TableInfo.KeyColumnCount) + continue; + + if (state.Keys.size() != Request->Keys.size()) { + state.Keys.resize(Request->Keys.size()); + } + + // we can safely use cells referencing original Request->Keys[x], + // because request will live until the end + TVector<TCell> extendedCells; + extendedCells.reserve(TableInfo.KeyColumnCount); + for (const auto& cell: key.GetCells()) { + extendedCells.emplace_back(cell); + } + // extend with nulls + extendedCells.resize(TableInfo.KeyColumnCount, TCell()); + + // most evil part: serialize and then parse... + state.Keys[i] = TSerializedCellVec(TSerializedCellVec::Serialize(extendedCells)); + } + userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now(); } else { // DS is owner, read system table - if (state.PathId.LocalPathId >= TDataShard::Schema::MinLocalTid) { - SetStatusError( - Result->Record, - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Only sys tables can be read by localTid, table " + + auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId); + if (!schema) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); - finished = true; return; } + TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema); + } - if (!state.ReadVersion.IsMax()) { - SetStatusError( - Result->Record, - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Only HEAD read from sys tables is allowed"); - finished = true; + state.Columns.reserve(record.ColumnsSize()); + for (auto col: record.GetColumns()) { + auto it = TableInfo.Columns.find(col); + if (it == TableInfo.Columns.end()) { + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::SCHEME_ERROR, + TStringBuilder() << "Unknown column: " << col); return; } - if (state.Format != NKikimrTxDataShard::CELLVEC) { - SetStatusError( - Result->Record, + state.Columns.push_back(col); + } + + { + auto p = CreateBlockBuilder(state, TableInfo); + if (!p.first) { + SendErrorAndAbort( + ctx, + state, Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Sys tables can be read only in cellvec format, but requested " - << (int)NKikimrTxDataShard::CELLVEC); - finished = true; + p.second); return; } + std::swap(BlockBuilder, p.first); + } - if (record.GetTableId().HasSchemaVersion()) { + state.Request = Request; + + Y_ASSERT(Result); + + state.State = TReadIteratorState::EState::Executing; + Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); + + PrepareValidationInfo(ctx, state); + } + + void SendResult(const TActorContext& ctx) override { + if (ResultSent) + return; + ResultSent = true; + + TReadIteratorId readId(Sender, Request->Record.GetReadId()); + auto it = Self->ReadIterators.find(readId); + if (it == Self->ReadIterators.end()) { + // the one who removed the iterator should have reply to user + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " has been invalidated before TReadOperation::SendResult()"); + return; + } + + Y_VERIFY(it->second); + auto& state = *it->second; + + if (!Result) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TReadOperation::Execute() finished without Result, aborting"); + Self->DeleteReadIterator(it); + + Result.reset(new TEvDataShard::TEvReadResult()); + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); + Result->Record.SetReadId(readId.ReadId); + Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + return; + } + + // error happened and status set + auto& record = Result->Record; + if (record.HasStatus()) { + record.SetReadId(readId.ReadId); + record.SetSeqNo(state.SeqNo + 1); + Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TReadOperation::Execute() finished with error, aborting: " << record.DebugString()); + Self->DeleteReadIterator(it); + return; + } + + Y_ASSERT(Reader); + Y_ASSERT(BlockBuilder); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << ", firstUnprocessed# " << state.FirstUnprocessedQuery); + + Reader->FillResult(*Result); + Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + } + + void Complete(const TActorContext& ctx) override { + SendResult(ctx); + + TReadIteratorId readId(Sender, Request->Record.GetReadId()); + auto it = Self->ReadIterators.find(readId); + if (it == Self->ReadIterators.end()) { + // the one who removed the iterator should have reply to user + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " has been invalidated before TReadOperation::Complete()"); + return; + } + + Y_VERIFY(it->second); + + // note that we save the state only when there're unread queries + if (Reader->HasUnreadQueries()) { + Y_ASSERT(it->second); + auto& state = *it->second; + Reader->UpdateState(state); + if (!state.IsExhausted()) { + ctx.Send( + Self->SelfId(), + new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId())); + } + } else { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " finished in read"); + Self->DeleteReadIterator(it); + } + } + +private: + void SendErrorAndAbort( + const TActorContext& ctx, + TReadIteratorState& state, + Ydb::StatusIds::StatusCode code, + const TString& msg) + { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " aborted read iterator# " + << state.ReadId << " with msg " << msg); + + if (!Result) + Result.reset(new TEvDataShard::TEvReadResult()); + + SetStatusError(Result->Record, code, msg); + Result->Record.SetReadId(state.ReadId); + Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + + // note that we don't need to execute any other unit + Abort(EExecutionUnitKind::CompletedOperations); + + TReadIteratorId readId(Sender, state.ReadId); + Self->DeleteReadIterator(readId); + } + + // return semantics is like in Execute() + bool Read(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) { + const auto& tableId = state.PathId.LocalPathId; + if (state.PathId.OwnerId == Self->GetPathOwnerId()) { + auto it = Self->TableInfos.find(tableId); + if (it == Self->TableInfos.end()) { SetStatusError( Result->Record, - Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Requesting system stable owned " << state.PathId.OwnerId - << ", localTid: " << state.PathId.LocalPathId - << ", with schema: " << record.GetTableId().GetSchemaVersion()); - finished = true; - return; + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId); + return true; + } + auto userTableInfo = it->second; + TableInfo = TShortTableInfo(userTableInfo); + auto currentSchemaVersion = TableInfo.SchemaVersion; + if (state.SchemaVersion != currentSchemaVersion) { + SetStatusError( + Result->Record, + Ydb::StatusIds::SCHEME_ERROR, + TStringBuilder() << "Schema changed, current " << currentSchemaVersion + << ", requested table schemaversion " << state.SchemaVersion); + return true; } + userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now(); + } else { auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId); if (!schema) { SetStatusError( @@ -919,61 +1128,30 @@ private: Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); - finished = true; - return; + return true; } TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema); } - if (Self->IsFollower()) { - if (!state.ReadVersion.IsMax()) { - // check that follower has this version - NIceDb::TNiceDb db(txc.DB); - TRowVersion lastCompleteTx; - if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step)) { - finished = false; - return; - } - if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId)) { - finished = false; - return; - } - - if (state.ReadVersion > lastCompleteTx) { - // it would be better to have something like retry later - SetStatusError( - Result->Record, - Ydb::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Version " << state.ReadVersion - << " is not available on follower yet"); - finished = true; - return; - } - } else { - SetStatusError( - Result->Record, - Ydb::StatusIds::UNSUPPORTED, - "HEAD version on followers is unsupported"); - finished = true; - return; - } - - // TODO: check that no lock requested - } + ui64 ownerId = state.PathId.OwnerId; + TSnapshotKey snapshotKey( + ownerId, + tableId, + state.ReadVersion.Step, + state.ReadVersion.TxId); - state.Columns.reserve(record.ColumnsSize()); - for (auto col: record.GetColumns()) { - auto it = TableInfo.Columns.find(col); - if (it == TableInfo.Columns.end()) { - SetStatusError( - Result->Record, - Ydb::StatusIds::SCHEME_ERROR, - TStringBuilder() << "Unknown column: " << col); - finished = true; - return; - } + bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark(); + bool allowMvcc = isMvccVersion && !Self->IsFollower(); + if (!Self->GetSnapshotManager().FindAvailable(snapshotKey) && !allowMvcc) { + SetStatusError( + Result->Record, + Ydb::StatusIds::ABORTED, + TStringBuilder() << "Table id " << tableId << " lost snapshot at " + << state.ReadVersion << " shard " << Self->TabletID() + << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() + << (Self->IsFollower() ? " RO replica" : "")); - state.Columns.push_back(col); + return true; } { @@ -983,23 +1161,101 @@ private: Result->Record, Ydb::StatusIds::BAD_REQUEST, p.second); - finished = true; - return; + return true; } std::swap(BlockBuilder, p.first); } - state.Request = Request; - Y_ASSERT(Result); - state.State = TReadIteratorState::EState::Executing; Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); + return Reader->Read(txc, ctx); + } + + void PrepareValidationInfo(const TActorContext&, const TReadIteratorState& state) { + TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion); + + TVector<NScheme::TTypeId> keyTypes; + + TVector<TKeyDesc::TColumnOp> columnOps; + columnOps.reserve(TableInfo.Columns.size()); + for (const auto& it: TableInfo.Columns) { + const auto& column = it.second; + TKeyDesc::TColumnOp op; + op.Column = it.first; + op.Operation = TKeyDesc::EColumnOperation::Read; + op.ExpectedType = column.Type; + columnOps.emplace_back(std::move(op)); + } - finished = false; + if (!state.Request->Keys.empty()) { + for (size_t i = 0; i < state.Request->Keys.size(); ++i) { + THolder<TKeyDesc> desc; + const auto& key = state.Request->Keys[i]; + if (key.GetCells().size() != TableInfo.KeyColumnCount) { + // key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf] + TTableRange range( + state.Keys[i].GetCells(), + true, + key.GetCells(), + true); + + desc = MakeHolder<TKeyDesc>( + tableId, + range, + TKeyDesc::ERowOperation::Read, + TableInfo.KeyColumnTypes, + columnOps, + state.Quota.Rows, + state.Quota.Bytes, + state.Reverse); + } else { + desc = MakeHolder<TKeyDesc>( + tableId, + TTableRange(key.GetCells()), + TKeyDesc::ERowOperation::Read, + TableInfo.KeyColumnTypes, + columnOps, + state.Quota.Rows, + state.Quota.Bytes, + state.Reverse); + } + + ValidationInfo.Keys.emplace_back( + NMiniKQL::IEngineFlat::TValidatedKey( + std::move(desc), + /* isWrite */ false)); + ++ValidationInfo.ReadsCount; + } + } else { + // since no keys, then we must have ranges (has been checked initially) + for (size_t i = 0; i < state.Request->Ranges.size(); ++i) { + TTableRange range = state.Request->Ranges[i].ToTableRange(); + if (range.From.size() != TableInfo.KeyColumnCount) + range.From = state.FromKeys[i].GetCells(); + + auto desc = MakeHolder<TKeyDesc>( + tableId, + range, + TKeyDesc::ERowOperation::Read, + TableInfo.KeyColumnTypes, + columnOps, + state.Quota.Rows, + state.Quota.Bytes, + state.Reverse); + + ValidationInfo.Keys.emplace_back( + NMiniKQL::IEngineFlat::TValidatedKey( + std::move(desc), + /* isWrite */ false)); + ++ValidationInfo.ReadsCount; + } + } + + ValidationInfo.Loaded = true; } - TLockInfo::TPtr AcquireLock(const TActorContext& ctx, TReadIteratorState& state) { + void AcquireLock(const TActorContext& ctx, TReadIteratorState& state) { auto& sysLocks = Self->SysLocksTable(); auto& locker = sysLocks.GetLocker(); @@ -1008,13 +1264,15 @@ private: TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion); TLockInfo::TPtr lock; + state.LockTxId = lockTxId; + if (!state.Request->Keys.empty()) { for (size_t i = 0; i < state.Request->Keys.size(); ++i) { const auto& key = state.Request->Keys[i]; if (key.GetCells().size() != TableInfo.KeyColumnCount) { // key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf] TTableRange lockRange( - key.GetCells(), + state.Keys[i].GetCells(), true, key.GetCells(), true); @@ -1024,52 +1282,140 @@ private: TPointKey pointKey = locker.MakePoint(tableId, key.GetCells()); lock = locker.AddPointLock(lockTxId, lockNodeId, pointKey, state.ReadVersion); } - if (!lock) - return nullptr; } } else { // since no keys, then we must have ranges (has been checked initially) for (size_t i = 0; i < state.Request->Ranges.size(); ++i) { - const auto& range = state.Request->Ranges[i]; - - TTableRange lockRange( - range.From.GetCells(), - range.FromInclusive, - range.To.GetCells(), - range.ToInclusive); - - TRangeKey rangeKey = locker.MakeRange(tableId, lockRange); + auto range = state.Request->Ranges[i].ToTableRange(); + if (range.From.size() != TableInfo.KeyColumnCount) + range.From = state.FromKeys[i].GetCells(); + TRangeKey rangeKey = locker.MakeRange(tableId, range); lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion); - if (!lock) - return nullptr; } } - Y_VERIFY(lock); + ui64 counter; + ui64 lockId; + bool isBroken; + if (lock) { + counter = lock->GetCounter(state.ReadVersion); + lockId = lock->GetLockId(); + isBroken = lock->IsBroken(state.ReadVersion); + } else { + counter = TSysTables::TLocksTable::TLock::ErrorNotSet; + lockId = lockTxId; + isBroken = true; + } + + if (!isBroken && Reader->HasInvisibleRowSkips()) { + locker.BreakLock(lockTxId, TRowVersion::Min()); + isBroken = true; + counter = TSysTables::TLocksTable::TLock::ErrorAlreadyBroken; + } - auto counter = lock->GetCounter(state.ReadVersion); sysLocks.UpdateCounters(counter); NKikimrTxDataShard::TLock *addLock; - if (!lock->IsBroken(state.ReadVersion)) { + if (!isBroken) { addLock = Result->Record.AddTxLocks(); } else { addLock = Result->Record.AddBrokenTxLocks(); } - addLock->SetLockId(lock->GetLockId()); + addLock->SetLockId(lockId); addLock->SetDataShard(Self->TabletID()); addLock->SetGeneration(Self->Generation()); - addLock->SetCounter(lock->GetCounter()); + addLock->SetCounter(counter); addLock->SetSchemeShard(state.PathId.OwnerId); addLock->SetPathId(state.PathId.LocalPathId); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " Acquired lock# " << lock->GetLockId() << ", counter# " << lock->GetCounter() + << " Acquired lock# " << lockId << ", counter# " << counter << " for " << state.PathId); - state.LockTxId = lockTxId; - return lock; + state.Lock = lock; // note that might be nullptr + } +}; + +class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionBase<TDataShard> { + TEvDataShard::TEvRead::TPtr Ev; + TReadIteratorId ReadId; + + TOperation::TPtr Op; + TVector<EExecutionUnitKind> CompleteList; + +public: + TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev) + : TBase(ds) + , Ev(std::move(ev)) + , ReadId(Ev->Sender, Ev->Get()->Record.GetReadId()) + {} + + TTxType GetTxType() const override { return TXTYPE_READ; } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline execute" + << ": at tablet# " << Self->TabletID()); + + auto it = Self->ReadIterators.find(ReadId); + if (it == Self->ReadIterators.end()) { + // iterator aborted + return true; + } + + auto& state = *it->second; + + // If tablet is in follower mode then we should sync scheme + // before we build and check operation. + if (Self->IsFollower()) { + NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; + TString errMessage; + + if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { + return false; + } + + if (status != NKikimrTxDataShard::TError::OK) { + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError( + result->Record, + Ydb::StatusIds::INTERNAL_ERROR, + TStringBuilder() << "Failed to sync follower: " << errMessage); + result->Record.SetReadId(ReadId.ReadId); + SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release()); + + return true; + } + } + + if (Ev) { + const ui64 tieBreaker = Self->NextTieBreakerIndex++; + Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev); + Op->BuildExecutionPlan(false); + Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); + + Ev = nullptr; + } + + auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx); + if (!CompleteList.empty()) { + return true; + } else if (status == EExecutionStatus::Restart) { + return false; + } else { + Op = nullptr; + return true; + } + } + + void Complete(const TActorContext& ctx) override { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete" + << ": at tablet# " << Self->TabletID()); + + Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } } }; @@ -1099,12 +1445,18 @@ public: auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { // read has been aborted + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# " + << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << " didn't found state"); return true; } Y_ASSERT(it->second); auto& state = *it->second; + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# " + << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId + << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery); + Result.reset(new TEvDataShard::TEvReadResult()); const auto& tableId = state.PathId.LocalPathId; @@ -1115,6 +1467,7 @@ public: Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId); + SendResult(ctx); return true; } auto userTableInfo = it->second; @@ -1126,6 +1479,7 @@ public: Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Schema changed, current " << currentSchemaVersion << ", requested table schemaversion " << state.SchemaVersion); + SendResult(ctx); return true; } @@ -1138,6 +1492,7 @@ public: Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); + SendResult(ctx); return true; } TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema); @@ -1160,7 +1515,7 @@ public: << state.ReadVersion << " shard " << Self->TabletID() << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() << (Self->IsFollower() ? " RO replica" : "")); - + SendResult(ctx); return true; } @@ -1171,6 +1526,7 @@ public: Result->Record, Ydb::StatusIds::BAD_REQUEST, p.second); + SendResult(ctx); return true; } std::swap(BlockBuilder, p.first); @@ -1179,25 +1535,26 @@ public: Y_ASSERT(Result); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " ReadContinue: " << Ev->Get()->Reader << "," << Ev->Get()->ReadId); + << " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId + << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery); Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); - return Reader->Read(txc, ctx); + if (Reader->Read(txc, ctx)) { + SendResult(ctx); + return true; + } + return false; } - void Complete(const TActorContext& ctx) override { - // TODO: it is complete copypaste from TEvRead::Complete() + void Complete(const TActorContext&) override { + // nothing to do + } + void SendResult(const TActorContext& ctx) { const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end()) { - // the one who removed the iterator should have reply to user - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " has been invalidated before TTxReadContinue::Complete()"); - return; - } - + Y_VERIFY(it != Self->ReadIterators.end()); Y_VERIFY(it->second); auto& state = *it->second; @@ -1209,7 +1566,7 @@ public: Result.reset(new TEvDataShard::TEvReadResult()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); - SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); return; } @@ -1218,43 +1575,45 @@ public: if (record.HasStatus()) { record.SetSeqNo(state.SeqNo + 1); record.SetReadId(readId.ReadId); - SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString()); Self->DeleteReadIterator(it); return; } - if (state.LockTxId) { - auto& sysLocks = Self->SysLocksTable(); - auto& locker = sysLocks.GetLocker(); - auto lock = locker.GetLock(state.LockTxId, state.ReadVersion, true); - if (!lock) { - // just sanity check, should not happen - SetStatusError(record, Ydb::StatusIds::INTERNAL_ERROR, "Failed to find lock"); - record.SetSeqNo(state.SeqNo + 1); - record.SetReadId(readId.ReadId); - SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); - LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxReadContinue::Execute() lost lock for lockTxId# " << state.LockTxId - << " in version# " << state.ReadVersion << ", aborting: " << record.DebugString()); - Self->DeleteReadIterator(it); - return; + if (state.Lock && !state.ReportedLockBroken) { + bool isBroken = false; + ui64 counter; + ui64 lockId; + if (state.Lock->IsBroken(state.ReadVersion)) { + isBroken = true; + counter = state.Lock->GetCounter(state.ReadVersion); + lockId = state.Lock->GetLockId(); + + } else if (Reader->HasInvisibleRowSkips()) { + isBroken = true; + counter = TSysTables::TLocksTable::TLock::ErrorBroken; + lockId = state.LockTxId; + + auto& sysLocks = Self->SysLocksTable(); + auto& locker = sysLocks.GetLocker(); + locker.BreakLock(state.LockTxId, TRowVersion::Min()); + sysLocks.UpdateCounters(counter); } - if (lock->IsBroken() && !state.ReportedLockBroken) { + if (isBroken) { state.ReportedLockBroken = true; NKikimrTxDataShard::TLock *addLock = record.AddBrokenTxLocks(); - addLock->SetLockId(lock->GetLockId()); + addLock->SetLockId(lockId); addLock->SetDataShard(Self->TabletID()); addLock->SetGeneration(Self->Generation()); - addLock->SetCounter(lock->GetCounter()); + addLock->SetCounter(counter); addLock->SetSchemeShard(state.PathId.OwnerId); addLock->SetPathId(state.PathId.LocalPathId); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxReadContinue::Execute() found broken lock# " << lock->GetLockId() - << " with lockTxId# " << state.LockTxId); + << " TTxReadContinue::Execute() found broken lock# " << lockId); } } @@ -1262,7 +1621,7 @@ public: Y_ASSERT(BlockBuilder); Reader->FillResult(*Result); - SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); if (Reader->HasUnreadQueries()) { Y_ASSERT(it->second); @@ -1285,7 +1644,15 @@ public: }; void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; + if (MediatorStateWaiting) { + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } + + // note that ins some cases we mutate this request below + const auto* request = ev->Get(); + const auto& record = request->Record; if (Y_UNLIKELY(!record.HasReadId())) { std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing ReadId"); @@ -1305,6 +1672,23 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct ctx.Send(ev->Sender, result.release()); }; + if (Pipeline.HasDrop()) { + replyWithError( + Ydb::StatusIds::INTERNAL_ERROR, + TStringBuilder() << "Request " << readId.ReadId << " rejected, because pipeline is in process of drop"); + return; + } + + size_t totalInFly = + ReadIteratorsInFly() + TxInFly() + ImmediateInFly() + + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting(); + if (totalInFly > GetMaxTxInFly()) { + replyWithError( + Ydb::StatusIds::OVERLOADED, + TStringBuilder() << "Request " << readId.ReadId << " rejected, MaxTxInFly was exceeded"); + return; + } + if (Y_UNLIKELY(ReadIterators.contains(readId))) { replyWithError( Ydb::StatusIds::ALREADY_EXISTS, @@ -1312,16 +1696,46 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } + if (!request->Keys.empty() && !request->Ranges.empty()) { + replyWithError(Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden"); + return; + } + + if (request->Keys.empty() && request->Ranges.empty()) { + replyWithError(Ydb::StatusIds::BAD_REQUEST, "Neither keys nor ranges"); + return; + } + + if (record.HasProgram()) { + replyWithError(Ydb::StatusIds::BAD_REQUEST, "PushDown is not supported"); + return; + } + + if (record.ColumnsSize() == 0) { + replyWithError(Ydb::StatusIds::BAD_REQUEST, "Missing Columns"); + return; + } + TRowVersion readVersion = TRowVersion::Max(); if (record.HasSnapshot()) { readVersion.Step = record.GetSnapshot().GetStep(); readVersion.TxId = record.GetSnapshot().GetTxId(); + } else if (record.GetTableId().GetOwnerId() != TabletID() && !IsFollower()) { + // sys table reads must be from HEAD, + // user tables are allowed to be read from HEAD. + // + // TODO: currently we transform HEAD read to MVCC. + // Instead we should try to read from HEAD and if full read + // done in single execution - succeed, if not - drop operation + // and transform HEAD to MVCC + readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr); + ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step); + ev->Get()->Record.MutableSnapshot()->SetTxId(Max<ui64>()); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed head to: " << readVersion.Step); } if (!IsFollower()) { - // MVCC is not allowed on followers, thus here we check only - // leader case, snapshot for follower is checked withing transaction, - // because we need to read from sys table if (record.GetTableId().GetOwnerId() != TabletID()) { // owner is schemeshard, read user table if (readVersion.IsMax()) { @@ -1339,7 +1753,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct readVersion.TxId); bool snapshotFound = GetSnapshotManager().FindAvailable(snapshotKey); - if (!snapshotFound && !IsFollower()) { + if (!snapshotFound) { // check if there is MVCC version and maybe wait if (readVersion < GetSnapshotManager().GetLowWatermark()) { replyWithError( @@ -1381,6 +1795,47 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct TStringBuilder() << "Only HEAD read from sys tables is allowed"); return; } + + if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) { + replyWithError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Only sys tables can be read by localTid, table " + << record.GetTableId().GetTableId()); + return; + } + + if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) { + replyWithError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Sys tables can be read only in cellvec format, but requested " + << (int)NKikimrTxDataShard::CELLVEC); + return; + } + + if (record.GetTableId().HasSchemaVersion()) { + replyWithError( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Requesting system stable owned " << record.GetTableId().GetOwnerId() + << ", localTid: " << record.GetTableId().GetTableId() + << ", with schema: " << record.GetTableId().GetSchemaVersion()); + return; + } + } + } else { + // follower: we can't check snapshot version, because need to sync and to sync + // we need transaction + if (readVersion.IsMax()) { + replyWithError( + Ydb::StatusIds::UNSUPPORTED, + "HEAD version on followers is unsupported"); + return; + } + + if (record.GetTableId().GetOwnerId() == TabletID()) { + replyWithError( + Ydb::StatusIds::UNSUPPORTED, + "Systable reads on followers are not supported"); + return; } } @@ -1405,7 +1860,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } ReadIterators.emplace(readId, new TReadIteratorState(sessionId)); - Executor()->Execute(new TTxRead(this, ev), ctx); + Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 005823dae4a..e0774b9d4c3 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -466,6 +466,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) { << "% is higher than threshold of " << (i64)CpuUsageReportThreshlodPercent << "% in-flight Tx: " << TxInFly() << " immediate Tx: " << ImmediateInFly() + << " readIterators: " << ReadIteratorsInFly() << " at datashard: " << TabletID() << " table: " << names); } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index a1c79a3a0d8..def43a6bff3 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -185,7 +185,6 @@ class TDataShard class TTxInitiateStatsUpdate; class TTxCheckInReadSets; class TTxRemoveOldInReadSets; - class TTxRead; class TTxReadContinue; class TTxReadColumns; class TTxGetInfo; @@ -213,6 +212,9 @@ class TDataShard class TTxUploadRows; class TTxEraseRows; + class TTxReadViaPipeline; + class TReadOperation; + ITransaction *CreateTxMonitoring(TDataShard *self, NMon::TEvRemoteHttpInfo::TPtr ev); ITransaction *CreateTxGetInfo(TDataShard *self, @@ -1182,6 +1184,10 @@ public: ui64 ImmediateInFly() const { return Pipeline.ImmediateInFly(); } ui64 TxWaiting() const { return Pipeline.WaitingTxs() + Pipeline.WaitingReadIterators(); } + // note that not part of ImmediateInFly() to not block scheme ops: + // we rather abort iterator if scheme changes between iterations + ui64 ReadIteratorsInFly() const { return ReadIterators.size();} + inline TRowVersion LastCompleteTxVersion() const { auto order = Pipeline.GetLastCompleteTx(); return TRowVersion(order.Step, order.TxId); @@ -1509,8 +1515,17 @@ public: ui64 GetMaxObservedStep() const; void SendImmediateWriteResult( const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0); - void SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie = 0); - void SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie = 0); + void SendImmediateReadResult( + TMonotonic readTime, + const TActorId& target, + IEventBase* event, + ui64 cookie = 0, + const TActorId& sessionId = {}); + void SendImmediateReadResult( + const TActorId& target, + IEventBase* event, + ui64 cookie = 0, + const TActorId& sessionId = {}); void SendAfterMediatorStepActivate(ui64 mediatorStep); void CheckMediatorStateRestored(); diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 6ec6d7c1021..7b4181b0bc1 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -144,11 +144,11 @@ TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TR return lock; } -TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at, bool brokenIsOK) const { +TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const { auto it = Locks.find(lockTxId); if (it != Locks.end()) { TLockInfo::TPtr lock = it->second; - if (!lock->IsBroken(at) || brokenIsOK) + if (!lock->IsBroken(at)) return lock; } return nullptr; diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 763da40ebeb..8217e6cbbde 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -302,7 +302,7 @@ public: TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at); TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at); TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at); - TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at, bool brokenIsOK = false) const; + TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const; ui64 LocksCount() const { return Locks.size(); } ui64 BrokenLocksCount() const { return BrokenLocks.size() + BrokenCandidates.size(); } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index e9c96bee255..eead6bab8af 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1603,7 +1603,9 @@ void TPipeline::MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const { bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { // check in-flight limit - size_t totalInFly = (Self->TxInFly() + Self->ImmediateInFly() + Self->ProposeQueue.Size() + WaitingDataTxOps.size()); + size_t totalInFly = + Self->ReadIteratorsInFly() + Self->TxInFly() + Self->ImmediateInFly() + + Self->ProposeQueue.Size() + WaitingDataTxOps.size(); if (totalInFly > Self->GetMaxTxInFly()) return false; // let tx to be rejected diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h new file mode 100644 index 00000000000..eb6208e6ddd --- /dev/null +++ b/ydb/core/tx/datashard/datashard_read_operation.h @@ -0,0 +1,21 @@ +#pragma once + +#include "datashard_impl.h" +#include "operation.h" + +namespace NKikimr::NDataShard { + +class IReadOperation { +public: + virtual ~IReadOperation() = default; + + // our interface for TReadUnit + virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) = 0; + virtual void SendResult(const TActorContext& ctx) = 0; + virtual void Complete(const TActorContext& ctx) = 0; + + // our interface for TCheckReadUnit + virtual void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) = 0; +}; + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 1cba91cbcb6..9d5f98871bd 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -1,8 +1,10 @@ #include "datashard_ut_common.h" +#include "datashard_ut_common_kqp.h" #include "datashard_active_transaction.h" #include "read_iterator.h" #include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/tx_proxy/read_table.h> @@ -25,7 +27,8 @@ void CreateTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, const TString &name, - bool withFollower = false) + bool withFollower = false, + ui64 shardCount = 1) { TVector<TShardedTableOptions::TColumn> columns = { {"key1", "Uint32", true, false}, @@ -35,7 +38,7 @@ void CreateTable(Tests::TServer::TPtr server, }; auto opts = TShardedTableOptions() - .Shards(1) + .Shards(shardCount) .Columns(columns); if (withFollower) @@ -288,7 +291,9 @@ struct TTestHelper { init(serverSettings); } - explicit TTestHelper(const TServerSettings& serverSettings) { + explicit TTestHelper(const TServerSettings& serverSettings, ui64 shardCount = 1, bool withFollower = false) { + WithFollower = withFollower; + ShardCount = shardCount; init(serverSettings); } @@ -306,7 +311,7 @@ struct TTestHelper { auto& table1 = Tables["table-1"]; table1.Name = "table-1"; { - CreateTable(Server, Sender, "/Root", "table-1", WithFollower); + CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount); ExecSQL(Server, Sender, R"( UPSERT INTO `/Root/table-1` (key1, key2, key3, value) @@ -444,7 +449,8 @@ struct TTestHelper { const TString& tableName, TEvDataShard::TEvRead* request, ui32 node = 0, - TActorId sender = {}) + TActorId sender = {}, + TDuration timeout = TDuration::Max()) { if (!sender) { sender = Sender; @@ -460,7 +466,7 @@ struct TTestHelper { GetTestPipeConfig(), table.ClientId); - return WaitReadResult(); + return WaitReadResult(timeout); } void SendReadAck( @@ -531,13 +537,24 @@ struct TTestHelper { auto readResult = SendRead(tableName, request.release()); - UNIT_ASSERT_VALUES_EQUAL(readResult->Record.TxLocksSize(), 0); - UNIT_ASSERT_VALUES_EQUAL(readResult->Record.BrokenTxLocksSize(), 1); + const NKikimrTxDataShard::TLock* prevLock; + if (prevResult.Record.TxLocksSize()) { + prevLock = &prevResult.Record.GetTxLocks(0); + } else { + prevLock = &prevResult.Record.GetBrokenTxLocks(0); + } - const auto& lock = prevResult.Record.GetTxLocks(0); - const auto& brokenLock = readResult->Record.GetBrokenTxLocks(0); - UNIT_ASSERT_VALUES_EQUAL(lock.GetLockId(), brokenLock.GetLockId()); - UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); + const NKikimrTxDataShard::TLock* newLock; + if (readResult->Record.TxLocksSize()) { + newLock = &readResult->Record.GetTxLocks(0); + } else { + newLock = &readResult->Record.GetBrokenTxLocks(0); + } + + UNIT_ASSERT(newLock && prevLock); + UNIT_ASSERT_VALUES_EQUAL(newLock->GetLockId(), prevLock->GetLockId()); + UNIT_ASSERT(newLock->GetCounter() != prevLock->GetCounter() + || newLock->GetGeneration() != prevLock->GetGeneration()); } NTabletPipe::TClientConfig GetTestPipeConfig() { @@ -549,6 +566,7 @@ struct TTestHelper { public: bool WithFollower = false; + ui64 ShardCount = 1; Tests::TServer::TPtr Server; TActorId Sender; @@ -751,7 +769,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { const auto& record1 = readResult1->Record; UNIT_ASSERT(!record1.GetLimitReached()); UNIT_ASSERT(record1.HasSeqNo()); - UNIT_ASSERT(!record1.HasFinished()); + //UNIT_ASSERT(!record1.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL); // TODO: check continuation token @@ -1534,6 +1552,43 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { TestReadKey(NKikimrTxDataShard::CELLVEC, true); } + Y_UNIT_TEST(ShouldNotReadMvccFromFollower) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetUseRealThreads(false); + + const ui64 shardCount = 1; + TTestHelper helper(serverSettings, shardCount, true); + + TRowVersion someVersion = TRowVersion(10000, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, someVersion); + AddKeyQuery(*request, {3, 3, 3}); + auto readResult = helper.SendRead("table-1", request.release()); + const auto& record = readResult->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::NOT_FOUND); + } + + Y_UNIT_TEST(ShouldNotReadHeadFromFollower) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetUseRealThreads(false); + + const ui64 shardCount = 1; + TTestHelper helper(serverSettings, shardCount, true); + + TRowVersion someVersion = TRowVersion(10000, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, someVersion); + request->Record.ClearSnapshot(); + AddKeyQuery(*request, {3, 3, 3}); + auto readResult = helper.SendRead("table-1", request.release()); + const auto& record = readResult->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); + } + Y_UNIT_TEST(ShouldStopWhenDisconnected) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); @@ -1568,8 +1623,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { AddKeyQuery(*request1, {3, 3, 3}); AddKeyQuery(*request1, {1, 1, 1}); - // set quota so that DS hangs waiting for ACK - request1->Record.SetMaxRows(1); + request1->Record.SetMaxRows(1); // set quota so that DS hangs waiting for ACK auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender); @@ -1584,7 +1638,268 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0); } - Y_UNIT_TEST(ShouldReturnMvccSnapshot) { + Y_UNIT_TEST(ShouldReadFromHead) { + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max()); + request->Record.ClearSnapshot(); + AddKeyQuery(*request, {3, 3, 3}); + + auto readResult = helper.SendRead("table-1", request.release()); + UNIT_ASSERT(readResult); + UNIT_ASSERT(readResult->Record.HasSnapshot()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 300}, + }); + } + + Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseNewEngine) { + // 1. Start read-write multishard transaction: readset will be blocked + // to hang transaction. Write is the key we want to read. + // 2a. Check that we can read prior blocked step. + // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait + // for the hanging transaction. + // 3. Finish hanging write. + // 4. MVCC read must finish, do another MVCC read of same version for sanity check + // that read is repeatable. + // 5. Read prior data again + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetUseRealThreads(false); + + const ui64 shardCount = 1; + TTestHelper helper(serverSettings, shardCount); + const auto& sender = helper.Sender; + + auto& runtime = *helper.Server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG); + + CreateTable(helper.Server, sender, "/Root", "table-2", false, shardCount); + ExecSQL(helper.Server, sender, R"( + UPSERT INTO `/Root/table-2` + (key1, key2, key3, value) + VALUES + (1, 1, 1, 1000), + (3, 3, 3, 3000), + (5, 5, 5, 5000), + (8, 0, 0, 8000), + (8, 0, 1, 8010), + (8, 1, 0, 8020), + (8, 1, 1, 8030), + (11, 11, 11, 11110); + )"); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + helper.Server->GetRuntime()->DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + bool capturePlanStep = true; + bool dropRS = true; + + ui64 lastPlanStep = 0; + TVector<THolder<IEventHandle>> readSets; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { + switch (event->GetTypeRewrite()) { + case TEvTxProcessing::EvPlanStep: { + if (capturePlanStep) { + auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>(); + lastPlanStep = planMessage->Record.GetStep(); + } + break; + } + case TEvTxProcessing::EvReadSet: { + if (dropRS) { + readSets.push_back(std::move(event)); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents); + + capturePlanStep = true; + + // Send SQL request which should hang due to lost RS + // We will capture its planstep + SendSQL( + helper.Server, + sender, + Q_("UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`")); + + waitFor([&]{ return lastPlanStep != 0; }, "intercepted TEvPlanStep"); + capturePlanStep = false; + const auto hangedStep = lastPlanStep; + + // With mvcc (or a better dependency tracking) the read below may start out-of-order, + // because transactions above are stuck before performing any writes. Make sure it's + // forced to wait for above transactions by commiting a write that is guaranteed + // to "happen" after transactions above. + SendSQL(helper.Server, sender, Q_((R"( + UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234); + UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345); + )"))); + + waitFor([&]{ return readSets.size() == 1; }, "intercepted RS"); + + // 2a: read prior data + { + auto oldVersion = TRowVersion(hangedStep - 1, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, oldVersion); + AddKeyQuery(*request, {3, 3, 3}); + + auto readResult = helper.SendRead("table-1", request.release()); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 300} + }); + } + + // 2b-1 (key): try to read hanged step, note that we have hanged write to the same key + { + auto oldVersion = TRowVersion(hangedStep, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, oldVersion); + AddKeyQuery(*request, {3, 3, 3}); + + auto readResult = helper.SendRead( + "table-1", + request.release(), + 0, + helper.Sender, + TDuration::MilliSeconds(100)); + UNIT_ASSERT(!readResult); // read is blocked by conflicts + } + + // 2b-2 (range): try to read hanged step, note that we have hanged write to the same key + { + auto oldVersion = TRowVersion(hangedStep, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 2, NKikimrTxDataShard::ARROW, oldVersion); + + AddRangeQuery<ui32>( + *request, + {1, 1, 1}, + true, + {5, 5, 5}, + true + ); + + auto readResult = helper.SendRead( + "table-1", + request.release(), + 0, + helper.Sender, + TDuration::MilliSeconds(100)); + UNIT_ASSERT(!readResult); // read is blocked by conflicts + } + + // 2b-3 (key prefix, equals to range): try to read hanged step, note that we have hanged write to the same key + { + auto oldVersion = TRowVersion(hangedStep, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 3, NKikimrTxDataShard::ARROW, oldVersion); + AddKeyQuery(*request, {3}); + + auto readResult = helper.SendRead( + "table-1", + request.release(), + 0, + helper.Sender, + TDuration::MilliSeconds(100)); + UNIT_ASSERT(!readResult); // read is blocked by conflicts + } + + // 3. Don't catch RS any more and send caught ones to proceed with upserts. + runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc); + for (auto &rs : readSets) + runtime.Send(rs.Release()); + + // Wait for upserts and immediate tx to finish. + { + TDispatchOptions options; + options.FinalEvents.emplace_back(IsTxResultComplete(), 3); + runtime.DispatchEvents(options); + } + + // read 2b-1 should finish now + { + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 3000} + }); + } + + // read 2b-2 should finish now + { + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {1, 1, 1, 1000}, + {3, 3, 3, 3000}, + {5, 5, 5, 5000} + }); + } + + // read 2b-3 should finish now + { + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 3000} + }); + } + + // 4: try to read hanged step again + { + auto oldVersion = TRowVersion(hangedStep, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 4, NKikimrTxDataShard::ARROW, oldVersion); + AddKeyQuery(*request, {3, 3, 3}); + + auto readResult = helper.SendRead("table-1", request.release()); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 3000} + }); + } + + // 5: read prior data again + { + auto oldVersion = TRowVersion(hangedStep - 1, Max<ui64>()); + auto request = helper.GetBaseReadRequest("table-1", 5, NKikimrTxDataShard::ARROW, oldVersion); + AddKeyQuery(*request, {3, 3, 3}); + + auto readResult = helper.SendRead("table-1", request.release()); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 300} + }); + } + } + + Y_UNIT_TEST(ShouldReturnMvccSnapshotFromFuture) { + // checks that when snapshot is in future, we wait for it + TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -1801,6 +2116,54 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); } + Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips) { + // If we read in v1, write in v2, then write breaks lock. + // Because of out of order execution, v2 can happen before v1 + // and we should properly handle it in DS to break lock. + // Similar to ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips, + // but lock is broken during the first iteration. + + TTestHelper helper; + + auto readVersion = CreateVolatileSnapshot( + helper.Server, + {"/Root/movies", "/Root/table-1"}, + TDuration::Hours(1)); + + // write new data above snapshot + ExecSQL(helper.Server, helper.Sender, R"( + UPSERT INTO `/Root/table-1` + (key1, key2, key3, value) + VALUES + (4, 4, 4, 4444); + )"); + + const ui64 lockTxId = 1011121314; + + auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, readVersion); + request1->Record.SetLockTxId(lockTxId); + + AddRangeQuery<ui32>( + *request1, + {1, 1, 1}, + true, + {5, 5, 5}, + true + ); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult1, { + {1, 1, 1, 100}, + {3, 3, 3, 300}, + {5, 5, 5, 500}, + }); + + UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 0); + UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 1); + + helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1); + } + Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeLeftBorder) { TTestHelper helper; @@ -1987,6 +2350,67 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); } + Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips) { + // If we read in v1, write in v2, then write breaks lock. + // Because of out of order execution, v2 can happen before v1 + // and we should properly handle it in DS to break lock. + + TTestHelper helper; + + auto readVersion = CreateVolatileSnapshot( + helper.Server, + {"/Root/movies", "/Root/table-1"}, + TDuration::Hours(1)); + + // write new data above snapshot + ExecSQL(helper.Server, helper.Sender, R"( + UPSERT INTO `/Root/table-1` + (key1, key2, key3, value) + VALUES + (4, 4, 4, 4444); + )"); + + const ui64 lockTxId = 1011121314; + + auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, readVersion); + request1->Record.SetLockTxId(lockTxId); + request1->Record.SetMaxRows(1); // set quota so that DS hangs waiting for ACK + + AddRangeQuery<ui32>( + *request1, + {1, 1, 1}, + true, + {5, 5, 5}, + true + ); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult1, { + {1, 1, 1, 100}, + }); + + // we had read only key=1, so didn't see invisible key=4 + UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); + + helper.SendReadAck("table-1", readResult1->Record, 100, 10000); + auto readResult2 = helper.WaitReadResult(); + CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { + {3, 3, 3, 300}, + {5, 5, 5, 500}, + }); + + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.TxLocksSize(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.BrokenTxLocksSize(), 1UL); + + const auto& lock = readResult1->Record.GetTxLocks(0); + const auto& brokenLock = readResult2->Record.GetBrokenTxLocks(0); + UNIT_ASSERT_VALUES_EQUAL(lock.GetLockId(), brokenLock.GetLockId()); + UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); + + helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1); + } + Y_UNIT_TEST(HandlePersistentSnapshotGoneInContinue) { // TODO } diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 94f8ffc241a..c4dd0c88622 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -126,6 +126,10 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, return CreateDropCdcStreamUnit(dataShard, pipeline); case EExecutionUnitKind::MoveIndex: return CreateMoveIndexUnit(dataShard, pipeline); + case EExecutionUnitKind::CheckRead: + return CreateCheckReadUnit(dataShard, pipeline); + case EExecutionUnitKind::ExecuteRead: + return CreateReadUnit(dataShard, pipeline); default: Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")"); } diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index ec62449d1aa..e03c77e2c2a 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -63,6 +63,8 @@ THolder<TExecutionUnit> CreateMoveTableUnit(TDataShard &dataShard, TPipeline &pi THolder<TExecutionUnit> CreateCreateCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline); } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index 4aefa0ca6e3..832854d966a 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -10,6 +10,7 @@ enum class EExecutionUnitKind : ui32 { CheckSnapshotTx, CheckDistributedEraseTx, CheckCommitWritesTx, + CheckRead, StoreDataTx, StoreSchemeTx, StoreSnapshotTx, @@ -35,6 +36,7 @@ enum class EExecutionUnitKind : ui32 { ExecuteKqpDataTx, ExecuteDistributedEraseTx, ExecuteCommitWritesTx, + ExecuteRead, CompleteOperation, ExecuteKqpScanTx, MakeScanSnapshot, diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 167f265f6a3..6d9a0b5b459 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -105,6 +105,7 @@ enum class EOperationKind : ui32 { // Values [100, inf) are used for internal kinds. DirectTx = 101, + ReadTx = 102, }; class TBasicOpInfo { @@ -149,6 +150,7 @@ public: EOperationKind GetKind() const { return Kind; } bool IsDataTx() const { return Kind == EOperationKind::DataTx; } + bool IsReadTx() const { return Kind == EOperationKind::ReadTx; } bool IsDirectTx() const { return Kind == EOperationKind::DirectTx; } bool IsSchemeTx() const { return Kind == EOperationKind::SchemeTx; } bool IsReadTable() const { return Kind == EOperationKind::ReadTable; } diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 3bd0fdbb215..45037c3fa00 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -1,6 +1,7 @@ #pragma once #include "datashard.h" +#include "datashard_locks.h" #include <ydb/core/base/row_version.h> #include <ydb/core/tablet_flat/flat_row_eggs.h> @@ -131,6 +132,7 @@ public: std::vector<NTable::TTag> Columns; TRowVersion ReadVersion = TRowVersion::Max(); ui64 LockTxId = 0; + TLockInfo::TPtr Lock; bool ReportedLockBroken = false; // note that will be always overwritten by values from request @@ -145,6 +147,13 @@ public: std::shared_ptr<TEvDataShard::TEvRead> Request; + // parallel to Request->Keys, but real data only in indices, + // where in Request->Keys we have key prefix (here we have properly extended one). + TVector<TSerializedCellVec> Keys; + + // same as Keys above, but for Request->Ranges.To + TVector<TSerializedCellVec> FromKeys; + // State itself // TQuota Quota; diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp new file mode 100644 index 00000000000..c51f1f82868 --- /dev/null +++ b/ydb/core/tx/datashard/read_op_unit.cpp @@ -0,0 +1,48 @@ +#include "datashard_read_operation.h" +#include "datashard_pipeline.h" +#include "execution_unit_ctors.h" + +namespace NKikimr::NDataShard { + +class TReadUnit : public TExecutionUnit { +public: + TReadUnit(TDataShard& self, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::ExecuteRead, true, self, pipeline) + { + } + + ~TReadUnit() = default; + + bool IsReadyToExecute(TOperation::TPtr op) const override { + return !op->HasRuntimeConflicts(); + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { + IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get()); + Y_VERIFY(readOperation); + + if (!readOperation->Execute(txc, ctx)) { + return EExecutionStatus::Restart; + } + + // TODO: check if we can send result right here to decrease latency + + // note that op has set locks itself, no ApplyLocks() required + DataShard.SubscribeNewLocks(ctx); + + return EExecutionStatus::DelayCompleteNoMoreRestarts; + } + + void Complete(TOperation::TPtr op, const TActorContext& ctx) override { + IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get()); + Y_VERIFY(readOperation); + + readOperation->Complete(ctx); + } +}; + +THolder<TExecutionUnit> CreateReadUnit(TDataShard& self, TPipeline& pipeline) { + return THolder(new TReadUnit(self, pipeline)); +} + +} // NKikimr::NDataShard |