aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-08-22 12:12:00 +0300
committereivanov89 <eivanov89@ydb.tech>2022-08-22 12:12:00 +0300
commit90ba2f22fde909f48fae87f8d00773acc6f793a8 (patch)
tree3f4f77d232818413e9beaf9086c44c7437e9475f
parent80386d4cd0c19b3d6930768677e46f2d25536bcc (diff)
downloadydb-90ba2f22fde909f48fae87f8d00773acc6f793a8.tar.gz
rewrite TEvRead to pipeline: properly handle dependencies, convert HEAD to MVCC, properly set locks
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt2
-rw-r--r--ydb/core/tx/datashard/check_read_unit.cpp38
-rw-r--r--ydb/core/tx/datashard/datashard.cpp52
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp1077
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h21
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h2
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_read_operation.h21
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp456
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execution_unit_ctors.h2
-rw-r--r--ydb/core/tx/datashard/execution_unit_kind.h2
-rw-r--r--ydb/core/tx/datashard/operation.h2
-rw-r--r--ydb/core/tx/datashard/read_iterator.h9
-rw-r--r--ydb/core/tx/datashard/read_op_unit.cpp48
17 files changed, 1395 insertions, 350 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 03c967165c4..f6690afa4b9 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -75,6 +75,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
@@ -186,6 +187,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp
diff --git a/ydb/core/tx/datashard/check_read_unit.cpp b/ydb/core/tx/datashard/check_read_unit.cpp
new file mode 100644
index 00000000000..28734b52205
--- /dev/null
+++ b/ydb/core/tx/datashard/check_read_unit.cpp
@@ -0,0 +1,38 @@
+#include "datashard_read_operation.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+namespace NKikimr::NDataShard {
+
+class TCheckReadUnit : public TExecutionUnit {
+public:
+ TCheckReadUnit(TDataShard& self, TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::CheckRead, true, self, pipeline)
+ {
+ }
+
+ ~TCheckReadUnit() = default;
+
+ bool IsReadyToExecute(TOperation::TPtr op) const override {
+ return !op->HasRuntimeConflicts();
+ }
+
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
+ IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get());
+ Y_VERIFY(readOperation != nullptr);
+
+ readOperation->CheckRequestAndInit(txc, ctx);
+ return EExecutionStatus::Executed;
+ }
+
+ void Complete(TOperation::TPtr, const TActorContext&) override {
+ // CheckRequestAndInit either already failed request and replied to user
+ // or prepared operation for further execution
+ }
+};
+
+THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard& self, TPipeline& pipeline) {
+ return THolder(new TCheckReadUnit(self, pipeline));
+}
+
+} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index ee76ffda77d..3540e87017a 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1641,36 +1641,54 @@ void TDataShard::SendImmediateWriteResult(
}
}
-void TDataShard::SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie) {
+void TDataShard::SendImmediateReadResult(
+ TMonotonic readTime,
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie,
+ const TActorId& sessionId)
+{
if (IsFollower() || !ReadOnlyLeaseEnabled()) {
// We just send possibly stale result (old behavior)
- Send(target, event, 0, cookie);
+ if (!sessionId) {
+ Send(target, event, 0, cookie);
+ } else {
+ SendViaSession(sessionId, target, SelfId(), event);
+ }
return;
}
struct TSendState : public TThrRefBase {
- TActorId Target;
- THolder<IEventBase> Event;
- ui64 Cookie;
-
- TSendState(const TActorId& target, IEventBase* event, ui64 cookie)
- : Target(target)
- , Event(event)
- , Cookie(cookie)
- { }
+ THolder<IEventHandle> Ev;
+
+ TSendState(const TActorId& sessionId, const TActorId& target, const TActorId& src, IEventBase* event, ui64 cookie)
+ {
+ const ui32 flags = 0;
+ Ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie);
+
+ if (sessionId) {
+ Ev->Rewrite(TEvInterconnect::EvForward, sessionId);
+ }
+ }
};
if (!readTime) {
readTime = AppData()->MonotonicTimeProvider->Now();
}
- Executor()->ConfirmReadOnlyLease(readTime, [this, state = MakeIntrusive<TSendState>(target, event, cookie)] {
- Send(state->Target, state->Event.Release(), 0, state->Cookie);
+ Executor()->ConfirmReadOnlyLease(readTime,
+ [state = MakeIntrusive<TSendState>(sessionId, target, SelfId(), event, cookie)] {
+ TActivationContext::Send(state->Ev.Release());
});
}
-void TDataShard::SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie) {
- SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie);
+void TDataShard::SendImmediateReadResult(
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie,
+ const TActorId& sessionId)
+{
+ SendImmediateReadResult(TMonotonic::Zero(), target, event, cookie, sessionId);
}
void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) {
@@ -1931,7 +1949,9 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectReasons.push_back("decided to reject due to given RejectProbability");
}
- size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting());
+ size_t totalInFly =
+ ReadIteratorsInFly() + TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size()
+ + ProposeQueue.Size() + TxWaiting();
if (totalInFly > GetMaxTxInFly()) {
reject = true;
rejectReasons.push_back("MaxTxInFly was exceeded");
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index b0202b57d97..0efe5a5ab06 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_read_operation.h"
#include <ydb/core/formats/arrow_batch_builder.h>
@@ -219,6 +220,8 @@ class TReader {
ui64 BytesInResult = 0;
+ bool InvisibleRowSkipsMet = false;
+
NHPTimer::STime StartTime;
NHPTimer::STime EndTime;
@@ -243,7 +246,12 @@ public:
EndTime = StartTime;
}
- EReadStatus ReadRange(TTransactionContext& txc, const TActorContext& ctx, const TSerializedTableRange& range) {
+ EReadStatus ReadRange(
+ TTransactionContext& txc,
+ const TActorContext& ctx,
+ const TSerializedTableRange& range,
+ size_t index)
+ {
bool fromInclusive;
TSerializedCellVec keyFromCells;
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
@@ -251,7 +259,11 @@ public:
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
} else {
fromInclusive = range.FromInclusive;
- keyFromCells = TSerializedCellVec(range.From);
+ if (index < State.Request->Ranges.size() && range.From.GetCells().size() != TableInfo.KeyColumnCount) {
+ keyFromCells = State.FromKeys[index];
+ } else {
+ keyFromCells = TSerializedCellVec(range.From);
+ }
}
const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive);
@@ -290,15 +302,20 @@ public:
return result;
}
- EReadStatus ReadKey(TTransactionContext& txc, const TActorContext& ctx, const TSerializedCellVec& keyCells) {
+ EReadStatus ReadKey(
+ TTransactionContext& txc,
+ const TActorContext& ctx,
+ const TSerializedCellVec& keyCells,
+ size_t keyIndex)
+ {
if (keyCells.GetCells().size() != TableInfo.KeyColumnCount) {
// key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf]
TSerializedTableRange range;
- range.From = keyCells;
+ range.From = State.Keys[keyIndex];
range.To = keyCells;
range.ToInclusive = true;
range.FromInclusive = true;
- return ReadRange(txc, ctx, range);
+ return ReadRange(txc, ctx, range, State.Request->Ranges.size());
}
if (ColumnTypes.empty()) {
@@ -316,6 +333,7 @@ public:
NTable::TSelectStats stats;
auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion);
RowsSinceLastCheck += 1 + stats.InvisibleRowSkips;
+ InvisibleRowSkipsMet |= stats.InvisibleRowSkips > 0;
if (ready == NTable::EReady::Page) {
return EReadStatus::NeedData;
}
@@ -342,7 +360,7 @@ public:
return true;
const auto& range = State.Request->Ranges[FirstUnprocessedQuery];
- auto status = ReadRange(txc, ctx, range);
+ auto status = ReadRange(txc, ctx, range, FirstUnprocessedQuery);
switch (status) {
case EReadStatus::Done:
continue;
@@ -362,7 +380,7 @@ public:
return true;
const auto& key = State.Request->Keys[FirstUnprocessedQuery];
- auto status = ReadKey(txc, ctx, key);
+ auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery);
switch (status) {
case EReadStatus::Done:
continue;
@@ -452,6 +470,9 @@ public:
record.SetReadId(State.ReadId);
record.SetSeqNo(State.SeqNo + 1);
+ record.MutableSnapshot()->SetStep(State.ReadVersion.Step);
+ record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId);
+
NKikimrTxDataShard::TReadContinuationToken continuationToken;
continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery);
@@ -471,6 +492,7 @@ public:
}
ui64 GetRowsRead() const { return RowsRead; }
+ bool HasInvisibleRowSkips() const { return InvisibleRowSkipsMet; }
private:
bool OutOfQuota() const {
@@ -528,6 +550,7 @@ private:
BlockBuilder.AddRow(TDbTupleRef(), rowValues);
++RowsRead;
+ InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0;
RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats);
if (ShouldStop()) {
return EReadStatus::StoppedByLimit;
@@ -536,6 +559,7 @@ private:
// last iteration to Page or Gone also might have deleted or invisible rows
RowsSinceLastCheck += ResetRowStats(iter->Stats);
+ InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0;
// TODO: consider restart when Page and too few data read
// (how much is too few, less than user's limit?)
@@ -556,25 +580,49 @@ const NHPTimer::STime TReader::MaxCyclesPerIteration =
} // namespace
-class TDataShard::TTxRead : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+class TDataShard::TReadOperation : public TOperation, public IReadOperation {
+ TDataShard* Self;
TActorId Sender;
std::shared_ptr<TEvDataShard::TEvRead> Request;
+ NMiniKQL::IEngineFlat::TValidationInfo ValidationInfo;
+
+ size_t ExecuteCount = 0;
+ bool ResultSent = false;
+
std::unique_ptr<TEvDataShard::TEvReadResult> Result;
- // on each Execute() set by CheckRequestAndInit
std::unique_ptr<IBlockBuilder> BlockBuilder;
TShortTableInfo TableInfo;
std::unique_ptr<TReader> Reader;
+ static constexpr ui32 Flags = NTxDataShard::TTxFlags::ReadOnly | NTxDataShard::TTxFlags::Immediate;
+
public:
- TTxRead(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev)
- : TBase(ds)
+ TReadOperation(TDataShard* ds, ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev)
+ : TOperation(TBasicOpInfo(txId, EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex))
+ , Self(ds)
, Sender(ev->Sender)
, Request(ev->Release().Release())
{}
- TTxType GetTxType() const override { return TXTYPE_READ; }
+ void BuildExecutionPlan(bool loaded) override
+ {
+ Y_VERIFY(GetExecutionPlan().empty());
+ Y_VERIFY(!loaded);
+
+ TVector<EExecutionUnitKind> plan;
+ plan.push_back(EExecutionUnitKind::CheckRead);
+ plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
+ plan.push_back(EExecutionUnitKind::ExecuteRead);
+ plan.push_back(EExecutionUnitKind::CompletedOperations);
+
+ RewriteExecutionPlan(plan);
+ }
+
+ const NMiniKQL::IEngineFlat::TValidationInfo& GetKeysInfo() const override {
+ return ValidationInfo;
+ }
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
TReadIteratorId readId(Sender, Request->Record.GetReadId());
@@ -583,12 +631,13 @@ public:
// iterator has been aborted
return true;
}
-
- Y_ASSERT(it->second);
+ Y_VERIFY(it->second);
auto& state = *it->second;
- state.State = TReadIteratorState::EState::Init;
+ Y_VERIFY(state.State == TReadIteratorState::EState::Executing);
- Result.reset(new TEvDataShard::TEvReadResult());
+ ++ExecuteCount;
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Execute read# " << ExecuteCount
+ << ", request: " << Request->Record);
switch (Self->State) {
case TShardState::Ready:
@@ -637,122 +686,92 @@ public:
return true;
}
- // TODO: in case of restart we recheck request and rebuild state,
- // I believe it is needed because between restart schema can change,
- // table can be deleted, version can gone, etc
- bool finished = finished;
- CheckRequestAndInit(txc, ctx, state, finished);
- if (state.State != TReadIteratorState::EState::Executing) {
- return finished;
- }
+ // we need to check that scheme version is still correct, table presents and
+ // version is still available
- Y_ASSERT(Reader);
+ if (state.PathId.OwnerId != Self->TabletID()) {
+ // owner is schemeshard, read user table
+ auto tableId = state.PathId.LocalPathId;
+ auto it = Self->TableInfos.find(tableId);
+ if (it == Self->TableInfos.end()) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Unknown table id: " << tableId);
+ return true;
+ }
+ auto& userTableInfo = it->second;
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read: " << Request->Record);
+ const ui64 ownerId = state.PathId.OwnerId;
+ TSnapshotKey snapshotKey(
+ ownerId,
+ tableId,
+ state.ReadVersion.Step,
+ state.ReadVersion.TxId);
+
+ bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool allowMvcc = isMvccVersion && !Self->IsFollower();
+ bool snapshotFound = false;
+ if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) {
+ // TODO: do we need to acquire?
+ snapshotFound = true;
+ } else if (allowMvcc) {
+ snapshotFound = true;
+ }
+
+ if (!snapshotFound) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Table id " << tableId << " lost snapshot at "
+ << state.ReadVersion << " shard " << Self->TabletID()
+ << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
+ << (Self->IsFollower() ? " RO replica" : ""));
+ return true;
+ }
- bool res = Reader->Read(txc, ctx);
- if (res && state.Request->Record.HasLockTxId()) {
- // note that we set locks only when finish transaction, i.e. we have read something
- // without page faults, etc and really finish transaction
- auto lock = AcquireLock(ctx, state);
- if (!lock) {
- // TODO: clarify, when AcquireLock might fail and if we can do anything with it
- SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Failed to set lock");
+ if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::SCHEME_ERROR,
+ TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion()
+ << ", requested table schemaversion " << state.SchemaVersion);
return true;
}
}
- return res;
+ if (!Read(txc, ctx, state))
+ return false;
+
+ if (Request->Record.HasLockTxId()) {
+ // note that we set locks only when first read finish transaction,
+ // i.e. we have read something without page faults
+ AcquireLock(ctx, state);
+ }
+
+ Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
+ return true;
}
- void Complete(const TActorContext& ctx) override {
+ void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override {
TReadIteratorId readId(Sender, Request->Record.GetReadId());
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
- // the one who removed the iterator should have reply to user
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " has been invalidated before TTxRead::Complete()");
+ // iterator has been aborted
+ Abort(EExecutionUnitKind::CompletedOperations);
return;
}
-
Y_VERIFY(it->second);
auto& state = *it->second;
+ Y_VERIFY(state.State == TReadIteratorState::EState::Init);
- if (!Result) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxRead::Execute() finished without Result, aborting");
- Self->DeleteReadIterator(it);
-
- Result.reset(new TEvDataShard::TEvReadResult());
- SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
- Result->Record.SetReadId(readId.ReadId);
- SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
- return;
- }
-
- // error happened and status set
- auto& record = Result->Record;
- if (record.HasStatus()) {
- record.SetReadId(readId.ReadId);
- record.SetSeqNo(state.SeqNo + 1);
- SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxRead::Execute() finished with error, aborting: " << record.DebugString());
- Self->DeleteReadIterator(it);
- return;
- }
-
- Y_ASSERT(Reader);
- Y_ASSERT(BlockBuilder);
-
- Reader->FillResult(*Result);
- SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
-
- // note that we save the state only when there're unread queries
- if (Reader->HasUnreadQueries()) {
- Y_ASSERT(it->second);
- auto& state = *it->second;
- Reader->UpdateState(state);
- if (!state.IsExhausted()) {
- ctx.Send(
- Self->SelfId(),
- new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId()));
- }
- } else {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " finished in read");
- Self->DeleteReadIterator(it);
- }
- }
+ Result.reset(new TEvDataShard::TEvReadResult());
-private:
- void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state, bool& finished) {
const auto& record = Request->Record;
- if (!Request->Keys.empty() && !Request->Ranges.empty()) {
- SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden");
- finished = true;
- return;
- }
-
- if (Request->Keys.empty() && Request->Ranges.empty()) {
- SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Neither keys nor ranges");
- finished = true;
- return;
- }
-
- if (record.HasProgram()) {
- SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "PushDown is not supported");
- finished = true;
- return;
- }
-
- if (record.ColumnsSize() == 0) {
- SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing Columns");
- finished = true;
- return;
- }
-
state.ReadId = record.GetReadId();
state.PathId = TPathId(
record.GetTableId().GetOwnerId(),
@@ -777,47 +796,27 @@ private:
state.Reverse = record.GetReverse();
- // note that we must call SyncSchemeOnFollower before we do any kind of checks
- if (Self->IsFollower()) {
- NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
- TString errMessage;
-
- if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
- finished = false;
- return;
- }
-
- if (status != NKikimrTxDataShard::TError::OK) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::INTERNAL_ERROR,
- "Follower not ready");
- finished = true;
- return;
- }
- }
-
if (state.PathId.OwnerId != Self->TabletID()) {
// owner is schemeshard, read user table
if (state.PathId.OwnerId != Self->GetPathOwnerId()) {
- SetStatusError(
- Result->Record,
+ SendErrorAndAbort(
+ ctx,
+ state,
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId
<< ", tableId: " << state.PathId.LocalPathId
<< ", from wrong owner: " << Self->GetPathOwnerId());
- finished = true;
return;
}
- auto tableId = state.PathId.LocalPathId;
+ const auto tableId = state.PathId.LocalPathId;
auto it = Self->TableInfos.find(tableId);
if (it == Self->TableInfos.end()) {
- SetStatusError(
- Result->Record,
+ SendErrorAndAbort(
+ ctx,
+ state,
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId);
- finished = true;
+ TStringBuilder() << "Unknown table id: " << tableId);
return;
}
@@ -825,93 +824,303 @@ private:
TableInfo = TShortTableInfo(userTableInfo);
if (userTableInfo->IsBackup) {
- SetStatusError(
- Result->Record,
+ SendErrorAndAbort(
+ ctx,
+ state,
Ydb::StatusIds::BAD_REQUEST,
"Can't read from a backup table");
- finished = true;
return;
}
- if (!state.ReadVersion.IsMax()) {
- ui64 ownerId = state.PathId.OwnerId;
- TSnapshotKey snapshotKey(
- ownerId,
- tableId,
- state.ReadVersion.Step,
- state.ReadVersion.TxId);
-
- bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
- bool allowMvcc = isMvccVersion && !Self->IsFollower();
- if (!Self->GetSnapshotManager().FindAvailable(snapshotKey) && !allowMvcc) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Table id " << tableId << " has no snapshot at "
- << state.ReadVersion << " shard " << Self->TabletID()
- << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
- << (Self->IsFollower() ? " RO replica" : ""));
- finished = true;
- return;
- }
+ // we must have chosen the version
+ Y_VERIFY(!state.ReadVersion.IsMax());
+
+ const ui64 ownerId = state.PathId.OwnerId;
+ TSnapshotKey snapshotKey(
+ ownerId,
+ tableId,
+ state.ReadVersion.Step,
+ state.ReadVersion.TxId);
+
+ bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool allowMvcc = isMvccVersion && !Self->IsFollower();
+ bool snapshotFound = false;
+ if (Self->GetSnapshotManager().FindAvailable(snapshotKey)) {
+ // TODO: do we need to acquire?
+ snapshotFound = true;
+ SetUsingSnapshotFlag();
+ } else if (allowMvcc) {
+ snapshotFound = true;
+ SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId));
+ }
+
+ if (!snapshotFound) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Table id " << tableId << " has no snapshot at "
+ << state.ReadVersion << " shard " << Self->TabletID()
+ << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
+ << (Self->IsFollower() ? " RO replica" : ""));
+ return;
}
state.SchemaVersion = userTableInfo->GetTableSchemaVersion();
if (record.GetTableId().HasSchemaVersion()) {
if (state.SchemaVersion != record.GetTableId().GetSchemaVersion()) {
- SetStatusError(
- Result->Record,
+ SendErrorAndAbort(
+ ctx,
+ state,
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Wrong schemaversion " << record.GetTableId().GetSchemaVersion()
<< " requested, table schemaversion " << state.SchemaVersion);
- finished = true;
return;
}
}
+ // TODO: remove later, when we sure that key prefix is properly
+ // interpreted same way everywhere: i.e. treated as 0 at the left and
+ // inf on the right.
+ // We really do weird transformations here, not sure if we can do better though
+ for (size_t i = 0; i < Request->Ranges.size(); ++i) {
+ const auto& key = Request->Ranges[i].From;
+ if (key.GetCells().size() == TableInfo.KeyColumnCount)
+ continue;
+
+ if (state.FromKeys.size() != Request->Ranges.size()) {
+ state.FromKeys.resize(Request->Ranges.size());
+ }
+
+ // we can safely use cells referencing original Request->Keys[x],
+ // because request will live until the end
+ TVector<TCell> extendedCells;
+ extendedCells.reserve(TableInfo.KeyColumnCount);
+ for (const auto& cell: key.GetCells()) {
+ extendedCells.emplace_back(cell);
+ }
+ // extend with nulls
+ extendedCells.resize(TableInfo.KeyColumnCount, TCell());
+
+ // most evil part: serialize and then parse...
+ state.FromKeys[i] = TSerializedCellVec(TSerializedCellVec::Serialize(extendedCells));
+ }
+
+ // TODO: remove later, when we sure that key prefix is properly
+ // interpreted same way everywhere: i.e. treated as 0 at the left and
+ // inf on the right.
+ // We really do weird transformations here, not sure if we can do better though
+ for (size_t i = 0; i < Request->Keys.size(); ++i) {
+ const auto& key = Request->Keys[i];
+ if (key.GetCells().size() == TableInfo.KeyColumnCount)
+ continue;
+
+ if (state.Keys.size() != Request->Keys.size()) {
+ state.Keys.resize(Request->Keys.size());
+ }
+
+ // we can safely use cells referencing original Request->Keys[x],
+ // because request will live until the end
+ TVector<TCell> extendedCells;
+ extendedCells.reserve(TableInfo.KeyColumnCount);
+ for (const auto& cell: key.GetCells()) {
+ extendedCells.emplace_back(cell);
+ }
+ // extend with nulls
+ extendedCells.resize(TableInfo.KeyColumnCount, TCell());
+
+ // most evil part: serialize and then parse...
+ state.Keys[i] = TSerializedCellVec(TSerializedCellVec::Serialize(extendedCells));
+ }
+
userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now();
} else {
// DS is owner, read system table
- if (state.PathId.LocalPathId >= TDataShard::Schema::MinLocalTid) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Only sys tables can be read by localTid, table "
+
+ auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId);
+ if (!schema) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
- finished = true;
return;
}
+ TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema);
+ }
- if (!state.ReadVersion.IsMax()) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Only HEAD read from sys tables is allowed");
- finished = true;
+ state.Columns.reserve(record.ColumnsSize());
+ for (auto col: record.GetColumns()) {
+ auto it = TableInfo.Columns.find(col);
+ if (it == TableInfo.Columns.end()) {
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::SCHEME_ERROR,
+ TStringBuilder() << "Unknown column: " << col);
return;
}
- if (state.Format != NKikimrTxDataShard::CELLVEC) {
- SetStatusError(
- Result->Record,
+ state.Columns.push_back(col);
+ }
+
+ {
+ auto p = CreateBlockBuilder(state, TableInfo);
+ if (!p.first) {
+ SendErrorAndAbort(
+ ctx,
+ state,
Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Sys tables can be read only in cellvec format, but requested "
- << (int)NKikimrTxDataShard::CELLVEC);
- finished = true;
+ p.second);
return;
}
+ std::swap(BlockBuilder, p.first);
+ }
- if (record.GetTableId().HasSchemaVersion()) {
+ state.Request = Request;
+
+ Y_ASSERT(Result);
+
+ state.State = TReadIteratorState::EState::Executing;
+ Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
+
+ PrepareValidationInfo(ctx, state);
+ }
+
+ void SendResult(const TActorContext& ctx) override {
+ if (ResultSent)
+ return;
+ ResultSent = true;
+
+ TReadIteratorId readId(Sender, Request->Record.GetReadId());
+ auto it = Self->ReadIterators.find(readId);
+ if (it == Self->ReadIterators.end()) {
+ // the one who removed the iterator should have reply to user
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " has been invalidated before TReadOperation::SendResult()");
+ return;
+ }
+
+ Y_VERIFY(it->second);
+ auto& state = *it->second;
+
+ if (!Result) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TReadOperation::Execute() finished without Result, aborting");
+ Self->DeleteReadIterator(it);
+
+ Result.reset(new TEvDataShard::TEvReadResult());
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
+ Result->Record.SetReadId(readId.ReadId);
+ Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+ return;
+ }
+
+ // error happened and status set
+ auto& record = Result->Record;
+ if (record.HasStatus()) {
+ record.SetReadId(readId.ReadId);
+ record.SetSeqNo(state.SeqNo + 1);
+ Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TReadOperation::Execute() finished with error, aborting: " << record.DebugString());
+ Self->DeleteReadIterator(it);
+ return;
+ }
+
+ Y_ASSERT(Reader);
+ Y_ASSERT(BlockBuilder);
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << ", firstUnprocessed# " << state.FirstUnprocessedQuery);
+
+ Reader->FillResult(*Result);
+ Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ SendResult(ctx);
+
+ TReadIteratorId readId(Sender, Request->Record.GetReadId());
+ auto it = Self->ReadIterators.find(readId);
+ if (it == Self->ReadIterators.end()) {
+ // the one who removed the iterator should have reply to user
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " has been invalidated before TReadOperation::Complete()");
+ return;
+ }
+
+ Y_VERIFY(it->second);
+
+ // note that we save the state only when there're unread queries
+ if (Reader->HasUnreadQueries()) {
+ Y_ASSERT(it->second);
+ auto& state = *it->second;
+ Reader->UpdateState(state);
+ if (!state.IsExhausted()) {
+ ctx.Send(
+ Self->SelfId(),
+ new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId()));
+ }
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " finished in read");
+ Self->DeleteReadIterator(it);
+ }
+ }
+
+private:
+ void SendErrorAndAbort(
+ const TActorContext& ctx,
+ TReadIteratorState& state,
+ Ydb::StatusIds::StatusCode code,
+ const TString& msg)
+ {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " aborted read iterator# "
+ << state.ReadId << " with msg " << msg);
+
+ if (!Result)
+ Result.reset(new TEvDataShard::TEvReadResult());
+
+ SetStatusError(Result->Record, code, msg);
+ Result->Record.SetReadId(state.ReadId);
+ Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+
+ // note that we don't need to execute any other unit
+ Abort(EExecutionUnitKind::CompletedOperations);
+
+ TReadIteratorId readId(Sender, state.ReadId);
+ Self->DeleteReadIterator(readId);
+ }
+
+ // return semantics is like in Execute()
+ bool Read(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) {
+ const auto& tableId = state.PathId.LocalPathId;
+ if (state.PathId.OwnerId == Self->GetPathOwnerId()) {
+ auto it = Self->TableInfos.find(tableId);
+ if (it == Self->TableInfos.end()) {
SetStatusError(
Result->Record,
- Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Requesting system stable owned " << state.PathId.OwnerId
- << ", localTid: " << state.PathId.LocalPathId
- << ", with schema: " << record.GetTableId().GetSchemaVersion());
- finished = true;
- return;
+ Ydb::StatusIds::NOT_FOUND,
+ TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId);
+ return true;
+ }
+ auto userTableInfo = it->second;
+ TableInfo = TShortTableInfo(userTableInfo);
+ auto currentSchemaVersion = TableInfo.SchemaVersion;
+ if (state.SchemaVersion != currentSchemaVersion) {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::SCHEME_ERROR,
+ TStringBuilder() << "Schema changed, current " << currentSchemaVersion
+ << ", requested table schemaversion " << state.SchemaVersion);
+ return true;
}
+ userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now();
+ } else {
auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId);
if (!schema) {
SetStatusError(
@@ -919,61 +1128,30 @@ private:
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
- finished = true;
- return;
+ return true;
}
TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema);
}
- if (Self->IsFollower()) {
- if (!state.ReadVersion.IsMax()) {
- // check that follower has this version
- NIceDb::TNiceDb db(txc.DB);
- TRowVersion lastCompleteTx;
- if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step)) {
- finished = false;
- return;
- }
- if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId)) {
- finished = false;
- return;
- }
-
- if (state.ReadVersion > lastCompleteTx) {
- // it would be better to have something like retry later
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::INTERNAL_ERROR,
- TStringBuilder() << "Version " << state.ReadVersion
- << " is not available on follower yet");
- finished = true;
- return;
- }
- } else {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::UNSUPPORTED,
- "HEAD version on followers is unsupported");
- finished = true;
- return;
- }
-
- // TODO: check that no lock requested
- }
+ ui64 ownerId = state.PathId.OwnerId;
+ TSnapshotKey snapshotKey(
+ ownerId,
+ tableId,
+ state.ReadVersion.Step,
+ state.ReadVersion.TxId);
- state.Columns.reserve(record.ColumnsSize());
- for (auto col: record.GetColumns()) {
- auto it = TableInfo.Columns.find(col);
- if (it == TableInfo.Columns.end()) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::SCHEME_ERROR,
- TStringBuilder() << "Unknown column: " << col);
- finished = true;
- return;
- }
+ bool isMvccVersion = state.ReadVersion >= Self->GetSnapshotManager().GetLowWatermark();
+ bool allowMvcc = isMvccVersion && !Self->IsFollower();
+ if (!Self->GetSnapshotManager().FindAvailable(snapshotKey) && !allowMvcc) {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::ABORTED,
+ TStringBuilder() << "Table id " << tableId << " lost snapshot at "
+ << state.ReadVersion << " shard " << Self->TabletID()
+ << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
+ << (Self->IsFollower() ? " RO replica" : ""));
- state.Columns.push_back(col);
+ return true;
}
{
@@ -983,23 +1161,101 @@ private:
Result->Record,
Ydb::StatusIds::BAD_REQUEST,
p.second);
- finished = true;
- return;
+ return true;
}
std::swap(BlockBuilder, p.first);
}
- state.Request = Request;
-
Y_ASSERT(Result);
- state.State = TReadIteratorState::EState::Executing;
Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
+ return Reader->Read(txc, ctx);
+ }
+
+ void PrepareValidationInfo(const TActorContext&, const TReadIteratorState& state) {
+ TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion);
+
+ TVector<NScheme::TTypeId> keyTypes;
+
+ TVector<TKeyDesc::TColumnOp> columnOps;
+ columnOps.reserve(TableInfo.Columns.size());
+ for (const auto& it: TableInfo.Columns) {
+ const auto& column = it.second;
+ TKeyDesc::TColumnOp op;
+ op.Column = it.first;
+ op.Operation = TKeyDesc::EColumnOperation::Read;
+ op.ExpectedType = column.Type;
+ columnOps.emplace_back(std::move(op));
+ }
- finished = false;
+ if (!state.Request->Keys.empty()) {
+ for (size_t i = 0; i < state.Request->Keys.size(); ++i) {
+ THolder<TKeyDesc> desc;
+ const auto& key = state.Request->Keys[i];
+ if (key.GetCells().size() != TableInfo.KeyColumnCount) {
+ // key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf]
+ TTableRange range(
+ state.Keys[i].GetCells(),
+ true,
+ key.GetCells(),
+ true);
+
+ desc = MakeHolder<TKeyDesc>(
+ tableId,
+ range,
+ TKeyDesc::ERowOperation::Read,
+ TableInfo.KeyColumnTypes,
+ columnOps,
+ state.Quota.Rows,
+ state.Quota.Bytes,
+ state.Reverse);
+ } else {
+ desc = MakeHolder<TKeyDesc>(
+ tableId,
+ TTableRange(key.GetCells()),
+ TKeyDesc::ERowOperation::Read,
+ TableInfo.KeyColumnTypes,
+ columnOps,
+ state.Quota.Rows,
+ state.Quota.Bytes,
+ state.Reverse);
+ }
+
+ ValidationInfo.Keys.emplace_back(
+ NMiniKQL::IEngineFlat::TValidatedKey(
+ std::move(desc),
+ /* isWrite */ false));
+ ++ValidationInfo.ReadsCount;
+ }
+ } else {
+ // since no keys, then we must have ranges (has been checked initially)
+ for (size_t i = 0; i < state.Request->Ranges.size(); ++i) {
+ TTableRange range = state.Request->Ranges[i].ToTableRange();
+ if (range.From.size() != TableInfo.KeyColumnCount)
+ range.From = state.FromKeys[i].GetCells();
+
+ auto desc = MakeHolder<TKeyDesc>(
+ tableId,
+ range,
+ TKeyDesc::ERowOperation::Read,
+ TableInfo.KeyColumnTypes,
+ columnOps,
+ state.Quota.Rows,
+ state.Quota.Bytes,
+ state.Reverse);
+
+ ValidationInfo.Keys.emplace_back(
+ NMiniKQL::IEngineFlat::TValidatedKey(
+ std::move(desc),
+ /* isWrite */ false));
+ ++ValidationInfo.ReadsCount;
+ }
+ }
+
+ ValidationInfo.Loaded = true;
}
- TLockInfo::TPtr AcquireLock(const TActorContext& ctx, TReadIteratorState& state) {
+ void AcquireLock(const TActorContext& ctx, TReadIteratorState& state) {
auto& sysLocks = Self->SysLocksTable();
auto& locker = sysLocks.GetLocker();
@@ -1008,13 +1264,15 @@ private:
TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion);
TLockInfo::TPtr lock;
+ state.LockTxId = lockTxId;
+
if (!state.Request->Keys.empty()) {
for (size_t i = 0; i < state.Request->Keys.size(); ++i) {
const auto& key = state.Request->Keys[i];
if (key.GetCells().size() != TableInfo.KeyColumnCount) {
// key prefix, treat it as range [prefix, 0, 0] - [prefix, +inf, +inf]
TTableRange lockRange(
- key.GetCells(),
+ state.Keys[i].GetCells(),
true,
key.GetCells(),
true);
@@ -1024,52 +1282,140 @@ private:
TPointKey pointKey = locker.MakePoint(tableId, key.GetCells());
lock = locker.AddPointLock(lockTxId, lockNodeId, pointKey, state.ReadVersion);
}
- if (!lock)
- return nullptr;
}
} else {
// since no keys, then we must have ranges (has been checked initially)
for (size_t i = 0; i < state.Request->Ranges.size(); ++i) {
- const auto& range = state.Request->Ranges[i];
-
- TTableRange lockRange(
- range.From.GetCells(),
- range.FromInclusive,
- range.To.GetCells(),
- range.ToInclusive);
-
- TRangeKey rangeKey = locker.MakeRange(tableId, lockRange);
+ auto range = state.Request->Ranges[i].ToTableRange();
+ if (range.From.size() != TableInfo.KeyColumnCount)
+ range.From = state.FromKeys[i].GetCells();
+ TRangeKey rangeKey = locker.MakeRange(tableId, range);
lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion);
- if (!lock)
- return nullptr;
}
}
- Y_VERIFY(lock);
+ ui64 counter;
+ ui64 lockId;
+ bool isBroken;
+ if (lock) {
+ counter = lock->GetCounter(state.ReadVersion);
+ lockId = lock->GetLockId();
+ isBroken = lock->IsBroken(state.ReadVersion);
+ } else {
+ counter = TSysTables::TLocksTable::TLock::ErrorNotSet;
+ lockId = lockTxId;
+ isBroken = true;
+ }
+
+ if (!isBroken && Reader->HasInvisibleRowSkips()) {
+ locker.BreakLock(lockTxId, TRowVersion::Min());
+ isBroken = true;
+ counter = TSysTables::TLocksTable::TLock::ErrorAlreadyBroken;
+ }
- auto counter = lock->GetCounter(state.ReadVersion);
sysLocks.UpdateCounters(counter);
NKikimrTxDataShard::TLock *addLock;
- if (!lock->IsBroken(state.ReadVersion)) {
+ if (!isBroken) {
addLock = Result->Record.AddTxLocks();
} else {
addLock = Result->Record.AddBrokenTxLocks();
}
- addLock->SetLockId(lock->GetLockId());
+ addLock->SetLockId(lockId);
addLock->SetDataShard(Self->TabletID());
addLock->SetGeneration(Self->Generation());
- addLock->SetCounter(lock->GetCounter());
+ addLock->SetCounter(counter);
addLock->SetSchemeShard(state.PathId.OwnerId);
addLock->SetPathId(state.PathId.LocalPathId);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
- << " Acquired lock# " << lock->GetLockId() << ", counter# " << lock->GetCounter()
+ << " Acquired lock# " << lockId << ", counter# " << counter
<< " for " << state.PathId);
- state.LockTxId = lockTxId;
- return lock;
+ state.Lock = lock; // note that might be nullptr
+ }
+};
+
+class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+ TEvDataShard::TEvRead::TPtr Ev;
+ TReadIteratorId ReadId;
+
+ TOperation::TPtr Op;
+ TVector<EExecutionUnitKind> CompleteList;
+
+public:
+ TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev)
+ : TBase(ds)
+ , Ev(std::move(ev))
+ , ReadId(Ev->Sender, Ev->Get()->Record.GetReadId())
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_READ; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline execute"
+ << ": at tablet# " << Self->TabletID());
+
+ auto it = Self->ReadIterators.find(ReadId);
+ if (it == Self->ReadIterators.end()) {
+ // iterator aborted
+ return true;
+ }
+
+ auto& state = *it->second;
+
+ // If tablet is in follower mode then we should sync scheme
+ // before we build and check operation.
+ if (Self->IsFollower()) {
+ NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
+ TString errMessage;
+
+ if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
+ return false;
+ }
+
+ if (status != NKikimrTxDataShard::TError::OK) {
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(
+ result->Record,
+ Ydb::StatusIds::INTERNAL_ERROR,
+ TStringBuilder() << "Failed to sync follower: " << errMessage);
+ result->Record.SetReadId(ReadId.ReadId);
+ SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release());
+
+ return true;
+ }
+ }
+
+ if (Ev) {
+ const ui64 tieBreaker = Self->NextTieBreakerIndex++;
+ Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev);
+ Op->BuildExecutionPlan(false);
+ Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
+
+ Ev = nullptr;
+ }
+
+ auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx);
+ if (!CompleteList.empty()) {
+ return true;
+ } else if (status == EExecutionStatus::Restart) {
+ return false;
+ } else {
+ Op = nullptr;
+ return true;
+ }
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete"
+ << ": at tablet# " << Self->TabletID());
+
+ Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
}
};
@@ -1099,12 +1445,18 @@ public:
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
// read has been aborted
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# "
+ << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << " didn't found state");
return true;
}
Y_ASSERT(it->second);
auto& state = *it->second;
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# "
+ << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId
+ << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
+
Result.reset(new TEvDataShard::TEvReadResult());
const auto& tableId = state.PathId.LocalPathId;
@@ -1115,6 +1467,7 @@ public:
Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId);
+ SendResult(ctx);
return true;
}
auto userTableInfo = it->second;
@@ -1126,6 +1479,7 @@ public:
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Schema changed, current " << currentSchemaVersion
<< ", requested table schemaversion " << state.SchemaVersion);
+ SendResult(ctx);
return true;
}
@@ -1138,6 +1492,7 @@ public:
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
+ SendResult(ctx);
return true;
}
TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema);
@@ -1160,7 +1515,7 @@ public:
<< state.ReadVersion << " shard " << Self->TabletID()
<< " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
<< (Self->IsFollower() ? " RO replica" : ""));
-
+ SendResult(ctx);
return true;
}
@@ -1171,6 +1526,7 @@ public:
Result->Record,
Ydb::StatusIds::BAD_REQUEST,
p.second);
+ SendResult(ctx);
return true;
}
std::swap(BlockBuilder, p.first);
@@ -1179,25 +1535,26 @@ public:
Y_ASSERT(Result);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
- << " ReadContinue: " << Ev->Get()->Reader << "," << Ev->Get()->ReadId);
+ << " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId
+ << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery);
Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
- return Reader->Read(txc, ctx);
+ if (Reader->Read(txc, ctx)) {
+ SendResult(ctx);
+ return true;
+ }
+ return false;
}
- void Complete(const TActorContext& ctx) override {
- // TODO: it is complete copypaste from TEvRead::Complete()
+ void Complete(const TActorContext&) override {
+ // nothing to do
+ }
+ void SendResult(const TActorContext& ctx) {
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
- if (it == Self->ReadIterators.end()) {
- // the one who removed the iterator should have reply to user
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " has been invalidated before TTxReadContinue::Complete()");
- return;
- }
-
+ Y_VERIFY(it != Self->ReadIterators.end());
Y_VERIFY(it->second);
auto& state = *it->second;
@@ -1209,7 +1566,7 @@ public:
Result.reset(new TEvDataShard::TEvReadResult());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
- SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
return;
}
@@ -1218,43 +1575,45 @@ public:
if (record.HasStatus()) {
record.SetSeqNo(state.SeqNo + 1);
record.SetReadId(readId.ReadId);
- SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString());
Self->DeleteReadIterator(it);
return;
}
- if (state.LockTxId) {
- auto& sysLocks = Self->SysLocksTable();
- auto& locker = sysLocks.GetLocker();
- auto lock = locker.GetLock(state.LockTxId, state.ReadVersion, true);
- if (!lock) {
- // just sanity check, should not happen
- SetStatusError(record, Ydb::StatusIds::INTERNAL_ERROR, "Failed to find lock");
- record.SetSeqNo(state.SeqNo + 1);
- record.SetReadId(readId.ReadId);
- SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
- LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxReadContinue::Execute() lost lock for lockTxId# " << state.LockTxId
- << " in version# " << state.ReadVersion << ", aborting: " << record.DebugString());
- Self->DeleteReadIterator(it);
- return;
+ if (state.Lock && !state.ReportedLockBroken) {
+ bool isBroken = false;
+ ui64 counter;
+ ui64 lockId;
+ if (state.Lock->IsBroken(state.ReadVersion)) {
+ isBroken = true;
+ counter = state.Lock->GetCounter(state.ReadVersion);
+ lockId = state.Lock->GetLockId();
+
+ } else if (Reader->HasInvisibleRowSkips()) {
+ isBroken = true;
+ counter = TSysTables::TLocksTable::TLock::ErrorBroken;
+ lockId = state.LockTxId;
+
+ auto& sysLocks = Self->SysLocksTable();
+ auto& locker = sysLocks.GetLocker();
+ locker.BreakLock(state.LockTxId, TRowVersion::Min());
+ sysLocks.UpdateCounters(counter);
}
- if (lock->IsBroken() && !state.ReportedLockBroken) {
+ if (isBroken) {
state.ReportedLockBroken = true;
NKikimrTxDataShard::TLock *addLock = record.AddBrokenTxLocks();
- addLock->SetLockId(lock->GetLockId());
+ addLock->SetLockId(lockId);
addLock->SetDataShard(Self->TabletID());
addLock->SetGeneration(Self->Generation());
- addLock->SetCounter(lock->GetCounter());
+ addLock->SetCounter(counter);
addLock->SetSchemeShard(state.PathId.OwnerId);
addLock->SetPathId(state.PathId.LocalPathId);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxReadContinue::Execute() found broken lock# " << lock->GetLockId()
- << " with lockTxId# " << state.LockTxId);
+ << " TTxReadContinue::Execute() found broken lock# " << lockId);
}
}
@@ -1262,7 +1621,7 @@ public:
Y_ASSERT(BlockBuilder);
Reader->FillResult(*Result);
- SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
if (Reader->HasUnreadQueries()) {
Y_ASSERT(it->second);
@@ -1285,7 +1644,15 @@ public:
};
void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
+ if (MediatorStateWaiting) {
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
+ // note that ins some cases we mutate this request below
+ const auto* request = ev->Get();
+ const auto& record = request->Record;
if (Y_UNLIKELY(!record.HasReadId())) {
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing ReadId");
@@ -1305,6 +1672,23 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
ctx.Send(ev->Sender, result.release());
};
+ if (Pipeline.HasDrop()) {
+ replyWithError(
+ Ydb::StatusIds::INTERNAL_ERROR,
+ TStringBuilder() << "Request " << readId.ReadId << " rejected, because pipeline is in process of drop");
+ return;
+ }
+
+ size_t totalInFly =
+ ReadIteratorsInFly() + TxInFly() + ImmediateInFly()
+ + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting();
+ if (totalInFly > GetMaxTxInFly()) {
+ replyWithError(
+ Ydb::StatusIds::OVERLOADED,
+ TStringBuilder() << "Request " << readId.ReadId << " rejected, MaxTxInFly was exceeded");
+ return;
+ }
+
if (Y_UNLIKELY(ReadIterators.contains(readId))) {
replyWithError(
Ydb::StatusIds::ALREADY_EXISTS,
@@ -1312,16 +1696,46 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
+ if (!request->Keys.empty() && !request->Ranges.empty()) {
+ replyWithError(Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden");
+ return;
+ }
+
+ if (request->Keys.empty() && request->Ranges.empty()) {
+ replyWithError(Ydb::StatusIds::BAD_REQUEST, "Neither keys nor ranges");
+ return;
+ }
+
+ if (record.HasProgram()) {
+ replyWithError(Ydb::StatusIds::BAD_REQUEST, "PushDown is not supported");
+ return;
+ }
+
+ if (record.ColumnsSize() == 0) {
+ replyWithError(Ydb::StatusIds::BAD_REQUEST, "Missing Columns");
+ return;
+ }
+
TRowVersion readVersion = TRowVersion::Max();
if (record.HasSnapshot()) {
readVersion.Step = record.GetSnapshot().GetStep();
readVersion.TxId = record.GetSnapshot().GetTxId();
+ } else if (record.GetTableId().GetOwnerId() != TabletID() && !IsFollower()) {
+ // sys table reads must be from HEAD,
+ // user tables are allowed to be read from HEAD.
+ //
+ // TODO: currently we transform HEAD read to MVCC.
+ // Instead we should try to read from HEAD and if full read
+ // done in single execution - succeed, if not - drop operation
+ // and transform HEAD to MVCC
+ readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr);
+ ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step);
+ ev->Get()->Record.MutableSnapshot()->SetTxId(Max<ui64>());
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed head to: " << readVersion.Step);
}
if (!IsFollower()) {
- // MVCC is not allowed on followers, thus here we check only
- // leader case, snapshot for follower is checked withing transaction,
- // because we need to read from sys table
if (record.GetTableId().GetOwnerId() != TabletID()) {
// owner is schemeshard, read user table
if (readVersion.IsMax()) {
@@ -1339,7 +1753,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
readVersion.TxId);
bool snapshotFound = GetSnapshotManager().FindAvailable(snapshotKey);
- if (!snapshotFound && !IsFollower()) {
+ if (!snapshotFound) {
// check if there is MVCC version and maybe wait
if (readVersion < GetSnapshotManager().GetLowWatermark()) {
replyWithError(
@@ -1381,6 +1795,47 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
TStringBuilder() << "Only HEAD read from sys tables is allowed");
return;
}
+
+ if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) {
+ replyWithError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Only sys tables can be read by localTid, table "
+ << record.GetTableId().GetTableId());
+ return;
+ }
+
+ if (record.GetResultFormat() != NKikimrTxDataShard::CELLVEC) {
+ replyWithError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Sys tables can be read only in cellvec format, but requested "
+ << (int)NKikimrTxDataShard::CELLVEC);
+ return;
+ }
+
+ if (record.GetTableId().HasSchemaVersion()) {
+ replyWithError(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Requesting system stable owned " << record.GetTableId().GetOwnerId()
+ << ", localTid: " << record.GetTableId().GetTableId()
+ << ", with schema: " << record.GetTableId().GetSchemaVersion());
+ return;
+ }
+ }
+ } else {
+ // follower: we can't check snapshot version, because need to sync and to sync
+ // we need transaction
+ if (readVersion.IsMax()) {
+ replyWithError(
+ Ydb::StatusIds::UNSUPPORTED,
+ "HEAD version on followers is unsupported");
+ return;
+ }
+
+ if (record.GetTableId().GetOwnerId() == TabletID()) {
+ replyWithError(
+ Ydb::StatusIds::UNSUPPORTED,
+ "Systable reads on followers are not supported");
+ return;
}
}
@@ -1405,7 +1860,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
ReadIterators.emplace(readId, new TReadIteratorState(sessionId));
- Executor()->Execute(new TTxRead(this, ev), ctx);
+ Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx);
}
void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 005823dae4a..e0774b9d4c3 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -466,6 +466,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) {
<< "% is higher than threshold of " << (i64)CpuUsageReportThreshlodPercent
<< "% in-flight Tx: " << TxInFly()
<< " immediate Tx: " << ImmediateInFly()
+ << " readIterators: " << ReadIteratorsInFly()
<< " at datashard: " << TabletID()
<< " table: " << names);
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index a1c79a3a0d8..def43a6bff3 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -185,7 +185,6 @@ class TDataShard
class TTxInitiateStatsUpdate;
class TTxCheckInReadSets;
class TTxRemoveOldInReadSets;
- class TTxRead;
class TTxReadContinue;
class TTxReadColumns;
class TTxGetInfo;
@@ -213,6 +212,9 @@ class TDataShard
class TTxUploadRows;
class TTxEraseRows;
+ class TTxReadViaPipeline;
+ class TReadOperation;
+
ITransaction *CreateTxMonitoring(TDataShard *self,
NMon::TEvRemoteHttpInfo::TPtr ev);
ITransaction *CreateTxGetInfo(TDataShard *self,
@@ -1182,6 +1184,10 @@ public:
ui64 ImmediateInFly() const { return Pipeline.ImmediateInFly(); }
ui64 TxWaiting() const { return Pipeline.WaitingTxs() + Pipeline.WaitingReadIterators(); }
+ // note that not part of ImmediateInFly() to not block scheme ops:
+ // we rather abort iterator if scheme changes between iterations
+ ui64 ReadIteratorsInFly() const { return ReadIterators.size();}
+
inline TRowVersion LastCompleteTxVersion() const {
auto order = Pipeline.GetLastCompleteTx();
return TRowVersion(order.Step, order.TxId);
@@ -1509,8 +1515,17 @@ public:
ui64 GetMaxObservedStep() const;
void SendImmediateWriteResult(
const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0);
- void SendImmediateReadResult(TMonotonic readTime, const TActorId& target, IEventBase* event, ui64 cookie = 0);
- void SendImmediateReadResult(const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ void SendImmediateReadResult(
+ TMonotonic readTime,
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie = 0,
+ const TActorId& sessionId = {});
+ void SendImmediateReadResult(
+ const TActorId& target,
+ IEventBase* event,
+ ui64 cookie = 0,
+ const TActorId& sessionId = {});
void SendAfterMediatorStepActivate(ui64 mediatorStep);
void CheckMediatorStateRestored();
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index 6ec6d7c1021..7b4181b0bc1 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -144,11 +144,11 @@ TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TR
return lock;
}
-TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at, bool brokenIsOK) const {
+TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const {
auto it = Locks.find(lockTxId);
if (it != Locks.end()) {
TLockInfo::TPtr lock = it->second;
- if (!lock->IsBroken(at) || brokenIsOK)
+ if (!lock->IsBroken(at))
return lock;
}
return nullptr;
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 763da40ebeb..8217e6cbbde 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -302,7 +302,7 @@ public:
TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at);
TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at);
TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at);
- TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at, bool brokenIsOK = false) const;
+ TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const;
ui64 LocksCount() const { return Locks.size(); }
ui64 BrokenLocksCount() const { return BrokenLocks.size() + BrokenCandidates.size(); }
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index e9c96bee255..eead6bab8af 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1603,7 +1603,9 @@ void TPipeline::MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const {
bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
// check in-flight limit
- size_t totalInFly = (Self->TxInFly() + Self->ImmediateInFly() + Self->ProposeQueue.Size() + WaitingDataTxOps.size());
+ size_t totalInFly =
+ Self->ReadIteratorsInFly() + Self->TxInFly() + Self->ImmediateInFly()
+ + Self->ProposeQueue.Size() + WaitingDataTxOps.size();
if (totalInFly > Self->GetMaxTxInFly())
return false; // let tx to be rejected
diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h
new file mode 100644
index 00000000000..eb6208e6ddd
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_read_operation.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "datashard_impl.h"
+#include "operation.h"
+
+namespace NKikimr::NDataShard {
+
+class IReadOperation {
+public:
+ virtual ~IReadOperation() = default;
+
+ // our interface for TReadUnit
+ virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) = 0;
+ virtual void SendResult(const TActorContext& ctx) = 0;
+ virtual void Complete(const TActorContext& ctx) = 0;
+
+ // our interface for TCheckReadUnit
+ virtual void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) = 0;
+};
+
+} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 1cba91cbcb6..9d5f98871bd 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -1,8 +1,10 @@
#include "datashard_ut_common.h"
+#include "datashard_ut_common_kqp.h"
#include "datashard_active_transaction.h"
#include "read_iterator.h"
#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/tx_proxy/read_table.h>
@@ -25,7 +27,8 @@ void CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
- bool withFollower = false)
+ bool withFollower = false,
+ ui64 shardCount = 1)
{
TVector<TShardedTableOptions::TColumn> columns = {
{"key1", "Uint32", true, false},
@@ -35,7 +38,7 @@ void CreateTable(Tests::TServer::TPtr server,
};
auto opts = TShardedTableOptions()
- .Shards(1)
+ .Shards(shardCount)
.Columns(columns);
if (withFollower)
@@ -288,7 +291,9 @@ struct TTestHelper {
init(serverSettings);
}
- explicit TTestHelper(const TServerSettings& serverSettings) {
+ explicit TTestHelper(const TServerSettings& serverSettings, ui64 shardCount = 1, bool withFollower = false) {
+ WithFollower = withFollower;
+ ShardCount = shardCount;
init(serverSettings);
}
@@ -306,7 +311,7 @@ struct TTestHelper {
auto& table1 = Tables["table-1"];
table1.Name = "table-1";
{
- CreateTable(Server, Sender, "/Root", "table-1", WithFollower);
+ CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/table-1`
(key1, key2, key3, value)
@@ -444,7 +449,8 @@ struct TTestHelper {
const TString& tableName,
TEvDataShard::TEvRead* request,
ui32 node = 0,
- TActorId sender = {})
+ TActorId sender = {},
+ TDuration timeout = TDuration::Max())
{
if (!sender) {
sender = Sender;
@@ -460,7 +466,7 @@ struct TTestHelper {
GetTestPipeConfig(),
table.ClientId);
- return WaitReadResult();
+ return WaitReadResult(timeout);
}
void SendReadAck(
@@ -531,13 +537,24 @@ struct TTestHelper {
auto readResult = SendRead(tableName, request.release());
- UNIT_ASSERT_VALUES_EQUAL(readResult->Record.TxLocksSize(), 0);
- UNIT_ASSERT_VALUES_EQUAL(readResult->Record.BrokenTxLocksSize(), 1);
+ const NKikimrTxDataShard::TLock* prevLock;
+ if (prevResult.Record.TxLocksSize()) {
+ prevLock = &prevResult.Record.GetTxLocks(0);
+ } else {
+ prevLock = &prevResult.Record.GetBrokenTxLocks(0);
+ }
- const auto& lock = prevResult.Record.GetTxLocks(0);
- const auto& brokenLock = readResult->Record.GetBrokenTxLocks(0);
- UNIT_ASSERT_VALUES_EQUAL(lock.GetLockId(), brokenLock.GetLockId());
- UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter());
+ const NKikimrTxDataShard::TLock* newLock;
+ if (readResult->Record.TxLocksSize()) {
+ newLock = &readResult->Record.GetTxLocks(0);
+ } else {
+ newLock = &readResult->Record.GetBrokenTxLocks(0);
+ }
+
+ UNIT_ASSERT(newLock && prevLock);
+ UNIT_ASSERT_VALUES_EQUAL(newLock->GetLockId(), prevLock->GetLockId());
+ UNIT_ASSERT(newLock->GetCounter() != prevLock->GetCounter()
+ || newLock->GetGeneration() != prevLock->GetGeneration());
}
NTabletPipe::TClientConfig GetTestPipeConfig() {
@@ -549,6 +566,7 @@ struct TTestHelper {
public:
bool WithFollower = false;
+ ui64 ShardCount = 1;
Tests::TServer::TPtr Server;
TActorId Sender;
@@ -751,7 +769,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
const auto& record1 = readResult1->Record;
UNIT_ASSERT(!record1.GetLimitReached());
UNIT_ASSERT(record1.HasSeqNo());
- UNIT_ASSERT(!record1.HasFinished());
+ //UNIT_ASSERT(!record1.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL);
// TODO: check continuation token
@@ -1534,6 +1552,43 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
TestReadKey(NKikimrTxDataShard::CELLVEC, true);
}
+ Y_UNIT_TEST(ShouldNotReadMvccFromFollower) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetUseRealThreads(false);
+
+ const ui64 shardCount = 1;
+ TTestHelper helper(serverSettings, shardCount, true);
+
+ TRowVersion someVersion = TRowVersion(10000, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, someVersion);
+ AddKeyQuery(*request, {3, 3, 3});
+ auto readResult = helper.SendRead("table-1", request.release());
+ const auto& record = readResult->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::NOT_FOUND);
+ }
+
+ Y_UNIT_TEST(ShouldNotReadHeadFromFollower) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetUseRealThreads(false);
+
+ const ui64 shardCount = 1;
+ TTestHelper helper(serverSettings, shardCount, true);
+
+ TRowVersion someVersion = TRowVersion(10000, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, someVersion);
+ request->Record.ClearSnapshot();
+ AddKeyQuery(*request, {3, 3, 3});
+ auto readResult = helper.SendRead("table-1", request.release());
+ const auto& record = readResult->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
+ }
+
Y_UNIT_TEST(ShouldStopWhenDisconnected) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
@@ -1568,8 +1623,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
AddKeyQuery(*request1, {3, 3, 3});
AddKeyQuery(*request1, {1, 1, 1});
- // set quota so that DS hangs waiting for ACK
- request1->Record.SetMaxRows(1);
+ request1->Record.SetMaxRows(1); // set quota so that DS hangs waiting for ACK
auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender);
@@ -1584,7 +1638,268 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
}
- Y_UNIT_TEST(ShouldReturnMvccSnapshot) {
+ Y_UNIT_TEST(ShouldReadFromHead) {
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max());
+ request->Record.ClearSnapshot();
+ AddKeyQuery(*request, {3, 3, 3});
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ UNIT_ASSERT(readResult);
+ UNIT_ASSERT(readResult->Record.HasSnapshot());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 300},
+ });
+ }
+
+ Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseNewEngine) {
+ // 1. Start read-write multishard transaction: readset will be blocked
+ // to hang transaction. Write is the key we want to read.
+ // 2a. Check that we can read prior blocked step.
+ // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait
+ // for the hanging transaction.
+ // 3. Finish hanging write.
+ // 4. MVCC read must finish, do another MVCC read of same version for sanity check
+ // that read is repeatable.
+ // 5. Read prior data again
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetUseRealThreads(false);
+
+ const ui64 shardCount = 1;
+ TTestHelper helper(serverSettings, shardCount);
+ const auto& sender = helper.Sender;
+
+ auto& runtime = *helper.Server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG);
+
+ CreateTable(helper.Server, sender, "/Root", "table-2", false, shardCount);
+ ExecSQL(helper.Server, sender, R"(
+ UPSERT INTO `/Root/table-2`
+ (key1, key2, key3, value)
+ VALUES
+ (1, 1, 1, 1000),
+ (3, 3, 3, 3000),
+ (5, 5, 5, 5000),
+ (8, 0, 0, 8000),
+ (8, 0, 1, 8010),
+ (8, 1, 0, 8020),
+ (8, 1, 1, 8030),
+ (11, 11, 11, 11110);
+ )");
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ helper.Server->GetRuntime()->DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ bool capturePlanStep = true;
+ bool dropRS = true;
+
+ ui64 lastPlanStep = 0;
+ TVector<THolder<IEventHandle>> readSets;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
+ switch (event->GetTypeRewrite()) {
+ case TEvTxProcessing::EvPlanStep: {
+ if (capturePlanStep) {
+ auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>();
+ lastPlanStep = planMessage->Record.GetStep();
+ }
+ break;
+ }
+ case TEvTxProcessing::EvReadSet: {
+ if (dropRS) {
+ readSets.push_back(std::move(event));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents);
+
+ capturePlanStep = true;
+
+ // Send SQL request which should hang due to lost RS
+ // We will capture its planstep
+ SendSQL(
+ helper.Server,
+ sender,
+ Q_("UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`"));
+
+ waitFor([&]{ return lastPlanStep != 0; }, "intercepted TEvPlanStep");
+ capturePlanStep = false;
+ const auto hangedStep = lastPlanStep;
+
+ // With mvcc (or a better dependency tracking) the read below may start out-of-order,
+ // because transactions above are stuck before performing any writes. Make sure it's
+ // forced to wait for above transactions by commiting a write that is guaranteed
+ // to "happen" after transactions above.
+ SendSQL(helper.Server, sender, Q_((R"(
+ UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234);
+ UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345);
+ )")));
+
+ waitFor([&]{ return readSets.size() == 1; }, "intercepted RS");
+
+ // 2a: read prior data
+ {
+ auto oldVersion = TRowVersion(hangedStep - 1, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, oldVersion);
+ AddKeyQuery(*request, {3, 3, 3});
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 300}
+ });
+ }
+
+ // 2b-1 (key): try to read hanged step, note that we have hanged write to the same key
+ {
+ auto oldVersion = TRowVersion(hangedStep, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, oldVersion);
+ AddKeyQuery(*request, {3, 3, 3});
+
+ auto readResult = helper.SendRead(
+ "table-1",
+ request.release(),
+ 0,
+ helper.Sender,
+ TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!readResult); // read is blocked by conflicts
+ }
+
+ // 2b-2 (range): try to read hanged step, note that we have hanged write to the same key
+ {
+ auto oldVersion = TRowVersion(hangedStep, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 2, NKikimrTxDataShard::ARROW, oldVersion);
+
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 1, 1},
+ true,
+ {5, 5, 5},
+ true
+ );
+
+ auto readResult = helper.SendRead(
+ "table-1",
+ request.release(),
+ 0,
+ helper.Sender,
+ TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!readResult); // read is blocked by conflicts
+ }
+
+ // 2b-3 (key prefix, equals to range): try to read hanged step, note that we have hanged write to the same key
+ {
+ auto oldVersion = TRowVersion(hangedStep, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 3, NKikimrTxDataShard::ARROW, oldVersion);
+ AddKeyQuery(*request, {3});
+
+ auto readResult = helper.SendRead(
+ "table-1",
+ request.release(),
+ 0,
+ helper.Sender,
+ TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!readResult); // read is blocked by conflicts
+ }
+
+ // 3. Don't catch RS any more and send caught ones to proceed with upserts.
+ runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc);
+ for (auto &rs : readSets)
+ runtime.Send(rs.Release());
+
+ // Wait for upserts and immediate tx to finish.
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(IsTxResultComplete(), 3);
+ runtime.DispatchEvents(options);
+ }
+
+ // read 2b-1 should finish now
+ {
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 3000}
+ });
+ }
+
+ // read 2b-2 should finish now
+ {
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {1, 1, 1, 1000},
+ {3, 3, 3, 3000},
+ {5, 5, 5, 5000}
+ });
+ }
+
+ // read 2b-3 should finish now
+ {
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 3000}
+ });
+ }
+
+ // 4: try to read hanged step again
+ {
+ auto oldVersion = TRowVersion(hangedStep, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 4, NKikimrTxDataShard::ARROW, oldVersion);
+ AddKeyQuery(*request, {3, 3, 3});
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 3000}
+ });
+ }
+
+ // 5: read prior data again
+ {
+ auto oldVersion = TRowVersion(hangedStep - 1, Max<ui64>());
+ auto request = helper.GetBaseReadRequest("table-1", 5, NKikimrTxDataShard::ARROW, oldVersion);
+ AddKeyQuery(*request, {3, 3, 3});
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 300}
+ });
+ }
+ }
+
+ Y_UNIT_TEST(ShouldReturnMvccSnapshotFromFuture) {
+ // checks that when snapshot is in future, we wait for it
+
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
@@ -1801,6 +2116,54 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1);
}
+ Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips) {
+ // If we read in v1, write in v2, then write breaks lock.
+ // Because of out of order execution, v2 can happen before v1
+ // and we should properly handle it in DS to break lock.
+ // Similar to ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips,
+ // but lock is broken during the first iteration.
+
+ TTestHelper helper;
+
+ auto readVersion = CreateVolatileSnapshot(
+ helper.Server,
+ {"/Root/movies", "/Root/table-1"},
+ TDuration::Hours(1));
+
+ // write new data above snapshot
+ ExecSQL(helper.Server, helper.Sender, R"(
+ UPSERT INTO `/Root/table-1`
+ (key1, key2, key3, value)
+ VALUES
+ (4, 4, 4, 4444);
+ )");
+
+ const ui64 lockTxId = 1011121314;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, readVersion);
+ request1->Record.SetLockTxId(lockTxId);
+
+ AddRangeQuery<ui32>(
+ *request1,
+ {1, 1, 1},
+ true,
+ {5, 5, 5},
+ true
+ );
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {1, 1, 1, 100},
+ {3, 3, 3, 300},
+ {5, 5, 5, 500},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 1);
+
+ helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1);
+ }
+
Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeLeftBorder) {
TTestHelper helper;
@@ -1987,6 +2350,67 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter());
}
+ Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips) {
+ // If we read in v1, write in v2, then write breaks lock.
+ // Because of out of order execution, v2 can happen before v1
+ // and we should properly handle it in DS to break lock.
+
+ TTestHelper helper;
+
+ auto readVersion = CreateVolatileSnapshot(
+ helper.Server,
+ {"/Root/movies", "/Root/table-1"},
+ TDuration::Hours(1));
+
+ // write new data above snapshot
+ ExecSQL(helper.Server, helper.Sender, R"(
+ UPSERT INTO `/Root/table-1`
+ (key1, key2, key3, value)
+ VALUES
+ (4, 4, 4, 4444);
+ )");
+
+ const ui64 lockTxId = 1011121314;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, readVersion);
+ request1->Record.SetLockTxId(lockTxId);
+ request1->Record.SetMaxRows(1); // set quota so that DS hangs waiting for ACK
+
+ AddRangeQuery<ui32>(
+ *request1,
+ {1, 1, 1},
+ true,
+ {5, 5, 5},
+ true
+ );
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {1, 1, 1, 100},
+ });
+
+ // we had read only key=1, so didn't see invisible key=4
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0);
+
+ helper.SendReadAck("table-1", readResult1->Record, 100, 10000);
+ auto readResult2 = helper.WaitReadResult();
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
+ {3, 3, 3, 300},
+ {5, 5, 5, 500},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.TxLocksSize(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.BrokenTxLocksSize(), 1UL);
+
+ const auto& lock = readResult1->Record.GetTxLocks(0);
+ const auto& brokenLock = readResult2->Record.GetBrokenTxLocks(0);
+ UNIT_ASSERT_VALUES_EQUAL(lock.GetLockId(), brokenLock.GetLockId());
+ UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter());
+
+ helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1);
+ }
+
Y_UNIT_TEST(HandlePersistentSnapshotGoneInContinue) {
// TODO
}
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index 94f8ffc241a..c4dd0c88622 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -126,6 +126,10 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
return CreateDropCdcStreamUnit(dataShard, pipeline);
case EExecutionUnitKind::MoveIndex:
return CreateMoveIndexUnit(dataShard, pipeline);
+ case EExecutionUnitKind::CheckRead:
+ return CreateCheckReadUnit(dataShard, pipeline);
+ case EExecutionUnitKind::ExecuteRead:
+ return CreateReadUnit(dataShard, pipeline);
default:
Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")");
}
diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h
index ec62449d1aa..e03c77e2c2a 100644
--- a/ydb/core/tx/datashard/execution_unit_ctors.h
+++ b/ydb/core/tx/datashard/execution_unit_ctors.h
@@ -63,6 +63,8 @@ THolder<TExecutionUnit> CreateMoveTableUnit(TDataShard &dataShard, TPipeline &pi
THolder<TExecutionUnit> CreateCreateCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline);
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h
index 4aefa0ca6e3..832854d966a 100644
--- a/ydb/core/tx/datashard/execution_unit_kind.h
+++ b/ydb/core/tx/datashard/execution_unit_kind.h
@@ -10,6 +10,7 @@ enum class EExecutionUnitKind : ui32 {
CheckSnapshotTx,
CheckDistributedEraseTx,
CheckCommitWritesTx,
+ CheckRead,
StoreDataTx,
StoreSchemeTx,
StoreSnapshotTx,
@@ -35,6 +36,7 @@ enum class EExecutionUnitKind : ui32 {
ExecuteKqpDataTx,
ExecuteDistributedEraseTx,
ExecuteCommitWritesTx,
+ ExecuteRead,
CompleteOperation,
ExecuteKqpScanTx,
MakeScanSnapshot,
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 167f265f6a3..6d9a0b5b459 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -105,6 +105,7 @@ enum class EOperationKind : ui32 {
// Values [100, inf) are used for internal kinds.
DirectTx = 101,
+ ReadTx = 102,
};
class TBasicOpInfo {
@@ -149,6 +150,7 @@ public:
EOperationKind GetKind() const { return Kind; }
bool IsDataTx() const { return Kind == EOperationKind::DataTx; }
+ bool IsReadTx() const { return Kind == EOperationKind::ReadTx; }
bool IsDirectTx() const { return Kind == EOperationKind::DirectTx; }
bool IsSchemeTx() const { return Kind == EOperationKind::SchemeTx; }
bool IsReadTable() const { return Kind == EOperationKind::ReadTable; }
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 3bd0fdbb215..45037c3fa00 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -1,6 +1,7 @@
#pragma once
#include "datashard.h"
+#include "datashard_locks.h"
#include <ydb/core/base/row_version.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
@@ -131,6 +132,7 @@ public:
std::vector<NTable::TTag> Columns;
TRowVersion ReadVersion = TRowVersion::Max();
ui64 LockTxId = 0;
+ TLockInfo::TPtr Lock;
bool ReportedLockBroken = false;
// note that will be always overwritten by values from request
@@ -145,6 +147,13 @@ public:
std::shared_ptr<TEvDataShard::TEvRead> Request;
+ // parallel to Request->Keys, but real data only in indices,
+ // where in Request->Keys we have key prefix (here we have properly extended one).
+ TVector<TSerializedCellVec> Keys;
+
+ // same as Keys above, but for Request->Ranges.To
+ TVector<TSerializedCellVec> FromKeys;
+
// State itself //
TQuota Quota;
diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp
new file mode 100644
index 00000000000..c51f1f82868
--- /dev/null
+++ b/ydb/core/tx/datashard/read_op_unit.cpp
@@ -0,0 +1,48 @@
+#include "datashard_read_operation.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+namespace NKikimr::NDataShard {
+
+class TReadUnit : public TExecutionUnit {
+public:
+ TReadUnit(TDataShard& self, TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::ExecuteRead, true, self, pipeline)
+ {
+ }
+
+ ~TReadUnit() = default;
+
+ bool IsReadyToExecute(TOperation::TPtr op) const override {
+ return !op->HasRuntimeConflicts();
+ }
+
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
+ IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get());
+ Y_VERIFY(readOperation);
+
+ if (!readOperation->Execute(txc, ctx)) {
+ return EExecutionStatus::Restart;
+ }
+
+ // TODO: check if we can send result right here to decrease latency
+
+ // note that op has set locks itself, no ApplyLocks() required
+ DataShard.SubscribeNewLocks(ctx);
+
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
+ }
+
+ void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
+ IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get());
+ Y_VERIFY(readOperation);
+
+ readOperation->Complete(ctx);
+ }
+};
+
+THolder<TExecutionUnit> CreateReadUnit(TDataShard& self, TPipeline& pipeline) {
+ return THolder(new TReadUnit(self, pipeline));
+}
+
+} // NKikimr::NDataShard