aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-02 19:04:27 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-02 19:04:27 +0300
commit1fabba6ede21f44b2ee543405ed8115ece5efedd (patch)
tree96b1f80401090e9d7bb074a954a4adfa4be0c5b7
parentcddf9ad736065f4d89cf6cd3765d0f1200cebc7e (diff)
downloadydb-1fabba6ede21f44b2ee543405ed8115ece5efedd.tar.gz
try to read from HEAD and convert to MVCC when needed
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp223
-rw-r--r--ydb/core/tx/datashard/datashard_read_operation.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp328
-rw-r--r--ydb/core/tx/datashard/read_iterator.h6
-rw-r--r--ydb/core/tx/datashard/read_op_unit.cpp10
6 files changed, 400 insertions, 171 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index e3b87242c2d..0c41df854e7 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1485,7 +1485,7 @@ message TEvGetCompactTableStatsResult {
// 4. Shard replies with TEvReadResult, which contains:
// - ReadId which is the same as in TEvRead. Shard gives no guarantee
// that the ReadId will be valid any time.
-// - Snapshot version, which is useful when it wasn't specified in TEvRead.
+// - Snapshot version in case of MVCC read or when HEAD read transformed to MVCC, i.e. repeatable read
// - SeqNo that should be used by user in TEvReadAck
// - ContinuationToken, which user can use to restart the read.
// - TxLocks or BrokenTxLocks when user specified LockTxId and LockNodeId.
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index f77c7d55295..cc0c87c6ee3 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -479,8 +479,10 @@ public:
record.SetReadId(State.ReadId);
record.SetSeqNo(State.SeqNo + 1);
- record.MutableSnapshot()->SetStep(State.ReadVersion.Step);
- record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId);
+ if (!State.IsHeadRead) {
+ record.MutableSnapshot()->SetStep(State.ReadVersion.Step);
+ record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId);
+ }
NKikimrTxDataShard::TReadContinuationToken continuationToken;
continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery);
@@ -633,12 +635,12 @@ public:
return ValidationInfo;
}
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) override {
TReadIteratorId readId(Sender, Request->Record.GetReadId());
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
// iterator has been aborted
- return true;
+ return EExecutionStatus::DelayComplete;
}
Y_VERIFY(it->second);
auto& state = *it->second;
@@ -662,14 +664,14 @@ public:
Ydb::StatusIds::OVERLOADED,
TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
<< ", tablet id: " << Self->TabletID());
- return true;
+ return EExecutionStatus::DelayComplete;
} else {
SetStatusError(
Result->Record,
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
<< ", will be deleted soon, tablet id: " << Self->TabletID());
- return true;
+ return EExecutionStatus::DelayComplete;
}
}
case TShardState::SplitSrcMakeSnapshot:
@@ -681,7 +683,7 @@ public:
Ydb::StatusIds::OVERLOADED,
TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
<< ", tablet id: " << Self->TabletID());
- return true;
+ return EExecutionStatus::DelayComplete;
}
case TShardState::Uninitialized:
case TShardState::WaitScheme:
@@ -692,7 +694,7 @@ public:
Ydb::StatusIds::INTERNAL_ERROR,
TStringBuilder() << "Wrong shard state: " << DatashardStateName(Self->State)
<< ", tablet id: " << Self->TabletID());
- return true;
+ return EExecutionStatus::DelayComplete;
}
// we need to check that scheme version is still correct, table presents and
@@ -708,7 +710,7 @@ public:
state,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << tableId);
- return true;
+ return EExecutionStatus::DelayComplete;
}
auto& userTableInfo = it->second;
@@ -738,7 +740,7 @@ public:
<< state.ReadVersion << " shard " << Self->TabletID()
<< " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
<< (Self->IsFollower() ? " RO replica" : ""));
- return true;
+ return EExecutionStatus::DelayComplete;
}
if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) {
@@ -748,12 +750,44 @@ public:
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion()
<< ", requested table schemaversion " << state.SchemaVersion);
- return true;
+ return EExecutionStatus::DelayComplete;
}
}
if (!Read(txc, ctx, state))
- return false;
+ return EExecutionStatus::Restart;
+
+ TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead;
+
+ if (state.IsHeadRead) {
+ bool hasError = !Result || Result->Record.HasStatus();
+ if (!hasError && Reader->HasUnreadQueries()) {
+ // we failed to read all at once and also there might be dependency
+ // we need to wait for: after its execution we can read MVCC snapshot
+ state.IsHeadRead = false;
+
+ // repeatable read
+ SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true);
+
+ TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId);
+ const auto& plannedOps = Self->Pipeline.GetActivePlannedOps();
+ auto it = plannedOps.lower_bound(order);
+ if (it != plannedOps.end() && it->first == order) {
+ if (!it->second->IsReadOnly()) {
+ // we need to wait this op
+ AddDependency(it->second);
+
+ // just for sanity: result should not contain anything at this step
+ Result.reset(new TEvDataShard::TEvReadResult());
+
+ return EExecutionStatus::Continue;
+ }
+ }
+ } else {
+ // either error or full read done
+ readType = TDataShard::EPromotePostExecuteEdges::ReadOnly;
+ }
+ }
if (Request->Record.HasLockTxId()) {
// note that we set locks only when first read finish transaction,
@@ -761,8 +795,10 @@ public:
AcquireLock(ctx, state);
}
- Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
- return true;
+ if (!Self->IsFollower())
+ Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc);
+
+ return EExecutionStatus::DelayComplete;
}
void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override {
@@ -860,7 +896,8 @@ public:
SetUsingSnapshotFlag();
} else if (allowMvcc) {
snapshotFound = true;
- SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId));
+ bool isRepeatable = state.IsHeadRead ? false : true;
+ SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId), isRepeatable);
}
if (!snapshotFound) {
@@ -956,26 +993,11 @@ public:
state.Columns.push_back(col);
}
- {
- auto p = CreateBlockBuilder(state, TableInfo);
- if (!p.first) {
- SendErrorAndAbort(
- ctx,
- state,
- Ydb::StatusIds::BAD_REQUEST,
- p.second);
- return;
- }
- std::swap(BlockBuilder, p.first);
- }
-
state.Request = Request;
+ state.State = TReadIteratorState::EState::Executing;
Y_ASSERT(Result);
- state.State = TReadIteratorState::EState::Executing;
- Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
-
PrepareValidationInfo(ctx, state);
}
@@ -1032,8 +1054,6 @@ public:
}
void Complete(const TActorContext& ctx) override {
- SendResult(ctx);
-
TReadIteratorId readId(Sender, Request->Record.GetReadId());
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
@@ -1042,13 +1062,17 @@ public:
<< " has been invalidated before TReadOperation::Complete()");
return;
}
+ auto& state = *it->second;
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Complete read# " << state.ReadId
+ << " after executionsCount# " << ExecuteCount);
+
+ SendResult(ctx);
Y_VERIFY(it->second);
// note that we save the state only when there're unread queries
if (Reader->HasUnreadQueries()) {
- Y_ASSERT(it->second);
- auto& state = *it->second;
Reader->UpdateState(state);
if (!state.IsExhausted()) {
ctx.Send(
@@ -1352,46 +1376,84 @@ public:
auto& state = *it->second;
- // If tablet is in follower mode then we should sync scheme
- // before we build and check operation.
- if (Self->IsFollower()) {
- NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
- TString errMessage;
+ try {
+ // If tablet is in follower mode then we should sync scheme
+ // before we build and check operation.
+ if (Self->IsFollower()) {
+ NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
+ TString errMessage;
- if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
- return false;
+ if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
+ return false;
+ }
+
+ if (status != NKikimrTxDataShard::TError::OK) {
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(
+ result->Record,
+ Ydb::StatusIds::INTERNAL_ERROR,
+ TStringBuilder() << "Failed to sync follower: " << errMessage);
+ result->Record.SetReadId(ReadId.ReadId);
+ SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release());
+
+ return true;
+ }
}
- if (status != NKikimrTxDataShard::TError::OK) {
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
- SetStatusError(
- result->Record,
- Ydb::StatusIds::INTERNAL_ERROR,
- TStringBuilder() << "Failed to sync follower: " << errMessage);
- result->Record.SetReadId(ReadId.ReadId);
- SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release());
+ if (Ev) {
+ const ui64 tieBreaker = Self->NextTieBreakerIndex++;
+ Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev);
+ Op->BuildExecutionPlan(false);
+ Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
- return true;
+ Ev = nullptr;
+ Op->IncrementInProgress();
}
- }
- if (Ev) {
- const ui64 tieBreaker = Self->NextTieBreakerIndex++;
- Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev);
- Op->BuildExecutionPlan(false);
- Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
+ Y_VERIFY(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty());
- Ev = nullptr;
- }
+ auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx);
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType()
+ << ") Execute with status# " << status << " at tablet# " << Self->TabletID());
+
+ switch (status) {
+ case EExecutionStatus::Restart:
+ return false;
+
+ case EExecutionStatus::Reschedule:
+ // Reschedule transaction as soon as possible
+ if (!Op->IsExecutionPlanFinished()) {
+ Op->IncrementInProgress();
+ Self->ExecuteProgressTx(Op, ctx);
+ }
+ break;
+
+ case EExecutionStatus::Executed:
+ case EExecutionStatus::Continue:
+ case EExecutionStatus::WaitComplete:
+ // No special handling
+ break;
+
+ default:
+ Y_FAIL_S("unexpected execution status " << status << " for operation "
+ << *Op << " " << Op->GetKind() << " at " << Self->TabletID());
+ }
+
+ if (CompleteList.empty()) {
+ Op->DecrementInProgress();
+ Op = nullptr;
+ }
- auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx);
- if (!CompleteList.empty()) {
- return true;
- } else if (status == EExecutionStatus::Restart) {
- return false;
- } else {
- Op = nullptr;
return true;
+ } catch (const TSchemeErrorTabletException&) {
+ Y_FAIL();
+ } catch (const TMemoryLimitExceededException&) {
+ Y_FAIL("there must be no leaked exceptions: TMemoryLimitExceededException");
+ } catch (const std::exception &e) {
+ Y_FAIL("there must be no leaked exceptions: %s", e.what());
+ } catch (...) {
+ Y_FAIL("there must be no leaked exceptions");
}
}
@@ -1399,10 +1461,21 @@ public:
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete"
<< ": at tablet# " << Self->TabletID());
- Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
- if (Self->Pipeline.CanRunAnotherOp()) {
- Self->PlanQueue.Progress(ctx);
+ if (!Op)
+ return;
+
+ Y_VERIFY(!Op->GetExecutionPlan().empty());
+ if (!CompleteList.empty()) {
+ Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
}
+
+ Op->DecrementInProgress();
+
+ if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished())
+ Self->Pipeline.AddCandidateOp(Op);
+
+ if (Self->Pipeline.CanRunAnotherOp())
+ Self->PlanQueue.Progress(ctx);
}
};
@@ -1704,20 +1777,18 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
TRowVersion readVersion = TRowVersion::Max();
+ bool isHeadRead = false;
if (record.HasSnapshot()) {
readVersion.Step = record.GetSnapshot().GetStep();
readVersion.TxId = record.GetSnapshot().GetTxId();
} else if (record.GetTableId().GetOwnerId() != TabletID() && !IsFollower()) {
// sys table reads must be from HEAD,
// user tables are allowed to be read from HEAD.
- //
- // TODO: currently we transform HEAD read to MVCC.
- // Instead we should try to read from HEAD and if full read
- // done in single execution - succeed, if not - drop operation
- // and transform HEAD to MVCC
+
readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr);
ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step);
- ev->Get()->Record.MutableSnapshot()->SetTxId(Max<ui64>());
+ ev->Get()->Record.MutableSnapshot()->SetTxId(readVersion.TxId);
+ isHeadRead = true;
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed head to: " << readVersion.Step);
}
@@ -1846,7 +1917,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
sessionId = ev->InterconnectSession;
}
- ReadIterators.emplace(readId, new TReadIteratorState(sessionId));
+ ReadIterators.emplace(readId, new TReadIteratorState(sessionId, isHeadRead));
Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx);
}
diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h
index eb6208e6ddd..92fcc8889a6 100644
--- a/ydb/core/tx/datashard/datashard_read_operation.h
+++ b/ydb/core/tx/datashard/datashard_read_operation.h
@@ -10,7 +10,7 @@ public:
virtual ~IReadOperation() = default;
// our interface for TReadUnit
- virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) = 0;
+ virtual EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) = 0;
virtual void SendResult(const TActorContext& ctx) = 0;
virtual void Complete(const TActorContext& ctx) = 0;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index f4870d0b20b..e7e66842bb2 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -557,6 +557,98 @@ struct TTestHelper {
|| newLock->GetGeneration() != prevLock->GetGeneration());
}
+ struct THangedReturn {
+ ui64 LastPlanStep = 0;
+ TVector<THolder<IEventHandle>> ReadSets;
+ };
+
+ THangedReturn HangWithTransactionWaitingRS(ui64 shardCount, bool finalUpserts = true) {
+ THangedReturn result;
+
+ auto& runtime = *Server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG);
+
+ CreateTable(Server, Sender, "/Root", "table-2", false, shardCount);
+ ExecSQL(Server, Sender, R"(
+ UPSERT INTO `/Root/table-2`
+ (key1, key2, key3, value)
+ VALUES
+ (1, 1, 1, 1000),
+ (3, 3, 3, 3000),
+ (5, 5, 5, 5000),
+ (8, 0, 0, 8000),
+ (8, 0, 1, 8010),
+ (8, 1, 0, 8020),
+ (8, 1, 1, 8030),
+ (11, 11, 11, 11110);
+ )");
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ Server->GetRuntime()->DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ bool capturePlanStep = true;
+ bool dropRS = true;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
+ switch (event->GetTypeRewrite()) {
+ case TEvTxProcessing::EvPlanStep: {
+ if (capturePlanStep) {
+ auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>();
+ result.LastPlanStep = planMessage->Record.GetStep();
+ }
+ break;
+ }
+ case TEvTxProcessing::EvReadSet: {
+ if (dropRS) {
+ result.ReadSets.push_back(std::move(event));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = Server->GetRuntime()->SetObserverFunc(captureEvents);
+
+ capturePlanStep = true;
+
+ // Send SQL request which should hang due to lost RS
+ // We will capture its planstep
+ SendSQL(
+ Server,
+ Sender,
+ "UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`");
+
+ waitFor([&]{ return result.LastPlanStep != 0; }, "intercepted TEvPlanStep");
+ capturePlanStep = false;
+
+ if (finalUpserts) {
+ // With mvcc (or a better dependency tracking) the read below may start out-of-order,
+ // because transactions above are stuck before performing any writes. Make sure it's
+ // forced to wait for above transactions by commiting a write that is guaranteed
+ // to "happen" after transactions above.
+ SendSQL(Server, Sender, (R"(
+ UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234);
+ UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345);
+ )"));
+ }
+
+ waitFor([&]{ return result.ReadSets.size() == 1; }, "intercepted RS");
+
+ return result;
+ }
+
NTabletPipe::TClientConfig GetTestPipeConfig() {
auto config = GetPipeConfigWithRetries();
if (WithFollower)
@@ -1639,6 +1731,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
}
Y_UNIT_TEST(ShouldReadFromHead) {
+ // read from HEAD when there is no conflicting operation
TTestHelper helper;
auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max());
@@ -1647,116 +1740,178 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto readResult = helper.SendRead("table-1", request.release());
UNIT_ASSERT(readResult);
- UNIT_ASSERT(readResult->Record.HasSnapshot());
+ UNIT_ASSERT(!readResult->Record.HasSnapshot());
CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
{3, 3, 3, 300},
});
}
- Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseNewEngine) {
- // 1. Start read-write multishard transaction: readset will be blocked
- // to hang transaction. Write is the key we want to read.
- // 2a. Check that we can read prior blocked step.
- // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait
- // for the hanging transaction.
- // 3. Finish hanging write.
- // 4. MVCC read must finish, do another MVCC read of same version for sanity check
- // that read is repeatable.
- // 5. Read prior data again
+ Y_UNIT_TEST(ShouldReadFromHeadWithConflict) {
+ // Similar to ShouldReadFromHead, but there is conflicting hanged operation.
+ // We will read all at once thus should not block
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetEnableMvcc(true)
- .SetEnableKqpSessionActor(UseNewEngine)
.SetUseRealThreads(false);
const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
- const auto& sender = helper.Sender;
+ auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount, false);
+
+ {
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max());
+ request->Record.ClearSnapshot();
+ AddKeyQuery(*request, {3, 3, 3});
+ AddKeyQuery(*request, {1, 1, 1});
+ AddKeyQuery(*request, {5, 5, 5});
+
+ auto readResult = helper.SendRead(
+ "table-1",
+ request.release(),
+ 0,
+ helper.Sender,
+ TDuration::MilliSeconds(100));
+ UNIT_ASSERT(readResult); // read is not blocked by conflicts!
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ UNIT_ASSERT(!record.HasSnapshot());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 300},
+ {1, 1, 1, 100},
+ {5, 5, 5, 500}
+ });
+ }
+
+ // Don't catch RS any more and send caught ones to proceed with upserts.
auto& runtime = *helper.Server->GetRuntime();
- runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG);
- runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG);
+ runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc);
+ for (auto &rs : hangedInfo.ReadSets)
+ runtime.Send(rs.Release());
- CreateTable(helper.Server, sender, "/Root", "table-2", false, shardCount);
- ExecSQL(helper.Server, sender, R"(
- UPSERT INTO `/Root/table-2`
- (key1, key2, key3, value)
- VALUES
- (1, 1, 1, 1000),
- (3, 3, 3, 3000),
- (5, 5, 5, 5000),
- (8, 0, 0, 8000),
- (8, 0, 1, 8010),
- (8, 1, 0, 8020),
- (8, 1, 1, 8030),
- (11, 11, 11, 11110);
- )");
+ // Wait for upsert to finish.
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(IsTxResultComplete(), 1);
+ runtime.DispatchEvents(options);
+ }
+ }
- auto waitFor = [&](const auto& condition, const TString& description) {
- if (!condition()) {
- Cerr << "... waiting for " << description << Endl;
- TDispatchOptions options;
- options.CustomFinalCondition = [&]() {
- return condition();
- };
- helper.Server->GetRuntime()->DispatchEvents(options);
- UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
- }
- };
+ Y_UNIT_TEST(ShouldReadFromHeadToMvccWithConflict) {
+ // Similar to ShouldProperlyOrderConflictingTransactionsMvcc, but we read HEAD
+ //
+ // In this test HEAD read waits conflicting transaction: first time we read from HEAD and
+ // notice that result it not full. Then restart after conflicting operation finishes
- bool capturePlanStep = true;
- bool dropRS = true;
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetUseRealThreads(false);
- ui64 lastPlanStep = 0;
- TVector<THolder<IEventHandle>> readSets;
+ const ui64 shardCount = 1;
+ TTestHelper helper(serverSettings, shardCount);
- auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
- switch (event->GetTypeRewrite()) {
- case TEvTxProcessing::EvPlanStep: {
- if (capturePlanStep) {
- auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>();
- lastPlanStep = planMessage->Record.GetStep();
- }
- break;
- }
- case TEvTxProcessing::EvReadSet: {
- if (dropRS) {
- readSets.push_back(std::move(event));
- return TTestActorRuntime::EEventAction::DROP;
- }
- break;
- }
- }
- return TTestActorRuntime::EEventAction::PROCESS;
- };
- auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents);
+ auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount, false);
- capturePlanStep = true;
+ {
+ // now read HEAD
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max());
+ request->Record.ClearSnapshot();
+ AddKeyQuery(*request, {3, 3, 3});
+ AddKeyQuery(*request, {1, 1, 1});
+ AddKeyQuery(*request, {3, 3, 3});
+ AddKeyQuery(*request, {1, 1, 1});
+ AddKeyQuery(*request, {5, 5, 5});
+ AddKeyQuery(*request, {11, 11, 11});
- // Send SQL request which should hang due to lost RS
- // We will capture its planstep
- SendSQL(
- helper.Server,
- sender,
- Q_("UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`"));
+ // intentionally 2: we check that between Read restart Reader's state is reset.
+ // Because of implementation we always read 1
+ request->Record.SetMaxRowsInResult(2);
- waitFor([&]{ return lastPlanStep != 0; }, "intercepted TEvPlanStep");
- capturePlanStep = false;
- const auto hangedStep = lastPlanStep;
+ auto readResult = helper.SendRead(
+ "table-1",
+ request.release(),
+ 0,
+ helper.Sender,
+ TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!readResult); // read is blocked by conflicts
+ }
+
+ // Don't catch RS any more and send caught ones to proceed with upserts.
+ auto& runtime = *helper.Server->GetRuntime();
+ runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc);
+ for (auto &rs : hangedInfo.ReadSets)
+ runtime.Send(rs.Release());
+
+ // Wait for upsert to finish.
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(IsTxResultComplete(), 1);
+ runtime.DispatchEvents(options);
+ }
- // With mvcc (or a better dependency tracking) the read below may start out-of-order,
- // because transactions above are stuck before performing any writes. Make sure it's
- // forced to wait for above transactions by commiting a write that is guaranteed
- // to "happen" after transactions above.
- SendSQL(helper.Server, sender, Q_((R"(
- UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234);
- UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345);
- )")));
+ {
+ // get1
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(!record.HasFinished());
+ UNIT_ASSERT(record.HasSnapshot());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 3000},
+ {1, 1, 1, 1000}
+ });
+ }
- waitFor([&]{ return readSets.size() == 1; }, "intercepted RS");
+ {
+ // get2
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(!record.HasFinished());
+ UNIT_ASSERT(record.HasSnapshot());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {3, 3, 3, 3000},
+ {1, 1, 1, 1000}
+ });
+ }
+
+ {
+ // get3
+ auto readResult = helper.WaitReadResult();
+ const auto& record = readResult->Record;
+ UNIT_ASSERT(record.HasFinished());
+ UNIT_ASSERT(record.HasSnapshot());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {5, 5, 5, 5000},
+ {11, 11, 11, 11110}
+ });
+ }
+ }
+
+ Y_UNIT_TEST(ShouldProperlyOrderConflictingTransactionsMvcc) {
+ // 1. Start read-write multishard transaction: readset will be blocked
+ // to hang transaction. Write is the key we want to read.
+ // 2a. Check that we can read prior blocked step.
+ // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait
+ // for the hanging transaction.
+ // 3. Finish hanging write.
+ // 4. MVCC read must finish, do another MVCC read of same version for sanity check
+ // that read is repeatable.
+ // 5. Read prior data again
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetUseRealThreads(false);
+
+ const ui64 shardCount = 1;
+ TTestHelper helper(serverSettings, shardCount);
+
+ auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount);
+ auto hangedStep = hangedInfo.LastPlanStep;
// 2a: read prior data
{
@@ -1825,8 +1980,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
}
// 3. Don't catch RS any more and send caught ones to proceed with upserts.
+ auto& runtime = *helper.Server->GetRuntime();
runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc);
- for (auto &rs : readSets)
+ for (auto &rs : hangedInfo.ReadSets)
runtime.Send(rs.Release());
// Wait for upserts and immediate tx to finish.
@@ -2506,7 +2662,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
Y_UNIT_TEST(ShouldCalculateQuota) {
- NDataShard::TReadIteratorState state({});
+ NDataShard::TReadIteratorState state({}, false);
state.Quota.Rows = 100;
state.Quota.Bytes = 1000;
state.ConsumeSeqNo(10, 100); // seqno1
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 6a0692880fd..dcd4eb6eb94 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -64,8 +64,9 @@ struct TReadIteratorState {
};
public:
- explicit TReadIteratorState(const TActorId& sessionId)
- : SessionId(sessionId)
+ TReadIteratorState(const TActorId& sessionId, bool isHeadRead)
+ : IsHeadRead(isHeadRead)
+ , SessionId(sessionId)
{}
bool IsExhausted() const { return State == EState::Exhausted; }
@@ -155,6 +156,7 @@ public:
TPathId PathId;
std::vector<NTable::TTag> Columns;
TRowVersion ReadVersion = TRowVersion::Max();
+ bool IsHeadRead = false;
ui64 LockTxId = 0;
TLockInfo::TPtr Lock;
bool ReportedLockBroken = false;
diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp
index c51f1f82868..5736675acc9 100644
--- a/ydb/core/tx/datashard/read_op_unit.cpp
+++ b/ydb/core/tx/datashard/read_op_unit.cpp
@@ -14,23 +14,23 @@ public:
~TReadUnit() = default;
bool IsReadyToExecute(TOperation::TPtr op) const override {
- return !op->HasRuntimeConflicts();
+ return !op->HasRuntimeConflicts() && op->GetDependencies().empty();
}
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get());
Y_VERIFY(readOperation);
- if (!readOperation->Execute(txc, ctx)) {
- return EExecutionStatus::Restart;
- }
+ auto status = readOperation->Execute(txc, ctx);
+ if (status == EExecutionStatus::Restart || status == EExecutionStatus::Continue)
+ return status;
// TODO: check if we can send result right here to decrease latency
// note that op has set locks itself, no ApplyLocks() required
DataShard.SubscribeNewLocks(ctx);
- return EExecutionStatus::DelayCompleteNoMoreRestarts;
+ return status;
}
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {