diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-02 19:04:27 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-02 19:04:27 +0300 |
commit | 1fabba6ede21f44b2ee543405ed8115ece5efedd (patch) | |
tree | 96b1f80401090e9d7bb074a954a4adfa4be0c5b7 | |
parent | cddf9ad736065f4d89cf6cd3765d0f1200cebc7e (diff) | |
download | ydb-1fabba6ede21f44b2ee543405ed8115ece5efedd.tar.gz |
try to read from HEAD and convert to MVCC when needed
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 223 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_read_operation.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 328 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_op_unit.cpp | 10 |
6 files changed, 400 insertions, 171 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index e3b87242c2d..0c41df854e7 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1485,7 +1485,7 @@ message TEvGetCompactTableStatsResult { // 4. Shard replies with TEvReadResult, which contains: // - ReadId which is the same as in TEvRead. Shard gives no guarantee // that the ReadId will be valid any time. -// - Snapshot version, which is useful when it wasn't specified in TEvRead. +// - Snapshot version in case of MVCC read or when HEAD read transformed to MVCC, i.e. repeatable read // - SeqNo that should be used by user in TEvReadAck // - ContinuationToken, which user can use to restart the read. // - TxLocks or BrokenTxLocks when user specified LockTxId and LockNodeId. diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index f77c7d55295..cc0c87c6ee3 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -479,8 +479,10 @@ public: record.SetReadId(State.ReadId); record.SetSeqNo(State.SeqNo + 1); - record.MutableSnapshot()->SetStep(State.ReadVersion.Step); - record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId); + if (!State.IsHeadRead) { + record.MutableSnapshot()->SetStep(State.ReadVersion.Step); + record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId); + } NKikimrTxDataShard::TReadContinuationToken continuationToken; continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery); @@ -633,12 +635,12 @@ public: return ValidationInfo; } - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) override { TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { // iterator has been aborted - return true; + return EExecutionStatus::DelayComplete; } Y_VERIFY(it->second); auto& state = *it->second; @@ -662,14 +664,14 @@ public: Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) << ", tablet id: " << Self->TabletID()); - return true; + return EExecutionStatus::DelayComplete; } else { SetStatusError( Result->Record, Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) << ", will be deleted soon, tablet id: " << Self->TabletID()); - return true; + return EExecutionStatus::DelayComplete; } } case TShardState::SplitSrcMakeSnapshot: @@ -681,7 +683,7 @@ public: Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) << ", tablet id: " << Self->TabletID()); - return true; + return EExecutionStatus::DelayComplete; } case TShardState::Uninitialized: case TShardState::WaitScheme: @@ -692,7 +694,7 @@ public: Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Wrong shard state: " << DatashardStateName(Self->State) << ", tablet id: " << Self->TabletID()); - return true; + return EExecutionStatus::DelayComplete; } // we need to check that scheme version is still correct, table presents and @@ -708,7 +710,7 @@ public: state, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << tableId); - return true; + return EExecutionStatus::DelayComplete; } auto& userTableInfo = it->second; @@ -738,7 +740,7 @@ public: << state.ReadVersion << " shard " << Self->TabletID() << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() << (Self->IsFollower() ? " RO replica" : "")); - return true; + return EExecutionStatus::DelayComplete; } if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) { @@ -748,12 +750,44 @@ public: Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion() << ", requested table schemaversion " << state.SchemaVersion); - return true; + return EExecutionStatus::DelayComplete; } } if (!Read(txc, ctx, state)) - return false; + return EExecutionStatus::Restart; + + TDataShard::EPromotePostExecuteEdges readType = TDataShard::EPromotePostExecuteEdges::RepeatableRead; + + if (state.IsHeadRead) { + bool hasError = !Result || Result->Record.HasStatus(); + if (!hasError && Reader->HasUnreadQueries()) { + // we failed to read all at once and also there might be dependency + // we need to wait for: after its execution we can read MVCC snapshot + state.IsHeadRead = false; + + // repeatable read + SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); + + TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId); + const auto& plannedOps = Self->Pipeline.GetActivePlannedOps(); + auto it = plannedOps.lower_bound(order); + if (it != plannedOps.end() && it->first == order) { + if (!it->second->IsReadOnly()) { + // we need to wait this op + AddDependency(it->second); + + // just for sanity: result should not contain anything at this step + Result.reset(new TEvDataShard::TEvReadResult()); + + return EExecutionStatus::Continue; + } + } + } else { + // either error or full read done + readType = TDataShard::EPromotePostExecuteEdges::ReadOnly; + } + } if (Request->Record.HasLockTxId()) { // note that we set locks only when first read finish transaction, @@ -761,8 +795,10 @@ public: AcquireLock(ctx, state); } - Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); - return true; + if (!Self->IsFollower()) + Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc); + + return EExecutionStatus::DelayComplete; } void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override { @@ -860,7 +896,8 @@ public: SetUsingSnapshotFlag(); } else if (allowMvcc) { snapshotFound = true; - SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId)); + bool isRepeatable = state.IsHeadRead ? false : true; + SetMvccSnapshot(TRowVersion(state.ReadVersion.Step, state.ReadVersion.TxId), isRepeatable); } if (!snapshotFound) { @@ -956,26 +993,11 @@ public: state.Columns.push_back(col); } - { - auto p = CreateBlockBuilder(state, TableInfo); - if (!p.first) { - SendErrorAndAbort( - ctx, - state, - Ydb::StatusIds::BAD_REQUEST, - p.second); - return; - } - std::swap(BlockBuilder, p.first); - } - state.Request = Request; + state.State = TReadIteratorState::EState::Executing; Y_ASSERT(Result); - state.State = TReadIteratorState::EState::Executing; - Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); - PrepareValidationInfo(ctx, state); } @@ -1032,8 +1054,6 @@ public: } void Complete(const TActorContext& ctx) override { - SendResult(ctx); - TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { @@ -1042,13 +1062,17 @@ public: << " has been invalidated before TReadOperation::Complete()"); return; } + auto& state = *it->second; + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Complete read# " << state.ReadId + << " after executionsCount# " << ExecuteCount); + + SendResult(ctx); Y_VERIFY(it->second); // note that we save the state only when there're unread queries if (Reader->HasUnreadQueries()) { - Y_ASSERT(it->second); - auto& state = *it->second; Reader->UpdateState(state); if (!state.IsExhausted()) { ctx.Send( @@ -1352,46 +1376,84 @@ public: auto& state = *it->second; - // If tablet is in follower mode then we should sync scheme - // before we build and check operation. - if (Self->IsFollower()) { - NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; - TString errMessage; + try { + // If tablet is in follower mode then we should sync scheme + // before we build and check operation. + if (Self->IsFollower()) { + NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; + TString errMessage; - if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { - return false; + if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { + return false; + } + + if (status != NKikimrTxDataShard::TError::OK) { + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError( + result->Record, + Ydb::StatusIds::INTERNAL_ERROR, + TStringBuilder() << "Failed to sync follower: " << errMessage); + result->Record.SetReadId(ReadId.ReadId); + SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release()); + + return true; + } } - if (status != NKikimrTxDataShard::TError::OK) { - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); - SetStatusError( - result->Record, - Ydb::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Failed to sync follower: " << errMessage); - result->Record.SetReadId(ReadId.ReadId); - SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release()); + if (Ev) { + const ui64 tieBreaker = Self->NextTieBreakerIndex++; + Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev); + Op->BuildExecutionPlan(false); + Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); - return true; + Ev = nullptr; + Op->IncrementInProgress(); } - } - if (Ev) { - const ui64 tieBreaker = Self->NextTieBreakerIndex++; - Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev); - Op->BuildExecutionPlan(false); - Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); + Y_VERIFY(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty()); - Ev = nullptr; - } + auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() + << ") Execute with status# " << status << " at tablet# " << Self->TabletID()); + + switch (status) { + case EExecutionStatus::Restart: + return false; + + case EExecutionStatus::Reschedule: + // Reschedule transaction as soon as possible + if (!Op->IsExecutionPlanFinished()) { + Op->IncrementInProgress(); + Self->ExecuteProgressTx(Op, ctx); + } + break; + + case EExecutionStatus::Executed: + case EExecutionStatus::Continue: + case EExecutionStatus::WaitComplete: + // No special handling + break; + + default: + Y_FAIL_S("unexpected execution status " << status << " for operation " + << *Op << " " << Op->GetKind() << " at " << Self->TabletID()); + } + + if (CompleteList.empty()) { + Op->DecrementInProgress(); + Op = nullptr; + } - auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx); - if (!CompleteList.empty()) { - return true; - } else if (status == EExecutionStatus::Restart) { - return false; - } else { - Op = nullptr; return true; + } catch (const TSchemeErrorTabletException&) { + Y_FAIL(); + } catch (const TMemoryLimitExceededException&) { + Y_FAIL("there must be no leaked exceptions: TMemoryLimitExceededException"); + } catch (const std::exception &e) { + Y_FAIL("there must be no leaked exceptions: %s", e.what()); + } catch (...) { + Y_FAIL("there must be no leaked exceptions"); } } @@ -1399,10 +1461,21 @@ public: LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadViaPipeline(" << GetTxType() << ") Complete" << ": at tablet# " << Self->TabletID()); - Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); - if (Self->Pipeline.CanRunAnotherOp()) { - Self->PlanQueue.Progress(ctx); + if (!Op) + return; + + Y_VERIFY(!Op->GetExecutionPlan().empty()); + if (!CompleteList.empty()) { + Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); } + + Op->DecrementInProgress(); + + if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) + Self->Pipeline.AddCandidateOp(Op); + + if (Self->Pipeline.CanRunAnotherOp()) + Self->PlanQueue.Progress(ctx); } }; @@ -1704,20 +1777,18 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } TRowVersion readVersion = TRowVersion::Max(); + bool isHeadRead = false; if (record.HasSnapshot()) { readVersion.Step = record.GetSnapshot().GetStep(); readVersion.TxId = record.GetSnapshot().GetTxId(); } else if (record.GetTableId().GetOwnerId() != TabletID() && !IsFollower()) { // sys table reads must be from HEAD, // user tables are allowed to be read from HEAD. - // - // TODO: currently we transform HEAD read to MVCC. - // Instead we should try to read from HEAD and if full read - // done in single execution - succeed, if not - drop operation - // and transform HEAD to MVCC + readVersion = GetMvccTxVersion(EMvccTxMode::ReadOnly, nullptr); ev->Get()->Record.MutableSnapshot()->SetStep(readVersion.Step); - ev->Get()->Record.MutableSnapshot()->SetTxId(Max<ui64>()); + ev->Get()->Record.MutableSnapshot()->SetTxId(readVersion.TxId); + isHeadRead = true; LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " changed head to: " << readVersion.Step); } @@ -1846,7 +1917,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct sessionId = ev->InterconnectSession; } - ReadIterators.emplace(readId, new TReadIteratorState(sessionId)); + ReadIterators.emplace(readId, new TReadIteratorState(sessionId, isHeadRead)); Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx); } diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h index eb6208e6ddd..92fcc8889a6 100644 --- a/ydb/core/tx/datashard/datashard_read_operation.h +++ b/ydb/core/tx/datashard/datashard_read_operation.h @@ -10,7 +10,7 @@ public: virtual ~IReadOperation() = default; // our interface for TReadUnit - virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) = 0; + virtual EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) = 0; virtual void SendResult(const TActorContext& ctx) = 0; virtual void Complete(const TActorContext& ctx) = 0; diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index f4870d0b20b..e7e66842bb2 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -557,6 +557,98 @@ struct TTestHelper { || newLock->GetGeneration() != prevLock->GetGeneration()); } + struct THangedReturn { + ui64 LastPlanStep = 0; + TVector<THolder<IEventHandle>> ReadSets; + }; + + THangedReturn HangWithTransactionWaitingRS(ui64 shardCount, bool finalUpserts = true) { + THangedReturn result; + + auto& runtime = *Server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG); + + CreateTable(Server, Sender, "/Root", "table-2", false, shardCount); + ExecSQL(Server, Sender, R"( + UPSERT INTO `/Root/table-2` + (key1, key2, key3, value) + VALUES + (1, 1, 1, 1000), + (3, 3, 3, 3000), + (5, 5, 5, 5000), + (8, 0, 0, 8000), + (8, 0, 1, 8010), + (8, 1, 0, 8020), + (8, 1, 1, 8030), + (11, 11, 11, 11110); + )"); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + Server->GetRuntime()->DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + bool capturePlanStep = true; + bool dropRS = true; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { + switch (event->GetTypeRewrite()) { + case TEvTxProcessing::EvPlanStep: { + if (capturePlanStep) { + auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>(); + result.LastPlanStep = planMessage->Record.GetStep(); + } + break; + } + case TEvTxProcessing::EvReadSet: { + if (dropRS) { + result.ReadSets.push_back(std::move(event)); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = Server->GetRuntime()->SetObserverFunc(captureEvents); + + capturePlanStep = true; + + // Send SQL request which should hang due to lost RS + // We will capture its planstep + SendSQL( + Server, + Sender, + "UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`"); + + waitFor([&]{ return result.LastPlanStep != 0; }, "intercepted TEvPlanStep"); + capturePlanStep = false; + + if (finalUpserts) { + // With mvcc (or a better dependency tracking) the read below may start out-of-order, + // because transactions above are stuck before performing any writes. Make sure it's + // forced to wait for above transactions by commiting a write that is guaranteed + // to "happen" after transactions above. + SendSQL(Server, Sender, (R"( + UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234); + UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345); + )")); + } + + waitFor([&]{ return result.ReadSets.size() == 1; }, "intercepted RS"); + + return result; + } + NTabletPipe::TClientConfig GetTestPipeConfig() { auto config = GetPipeConfigWithRetries(); if (WithFollower) @@ -1639,6 +1731,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { } Y_UNIT_TEST(ShouldReadFromHead) { + // read from HEAD when there is no conflicting operation TTestHelper helper; auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max()); @@ -1647,116 +1740,178 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto readResult = helper.SendRead("table-1", request.release()); UNIT_ASSERT(readResult); - UNIT_ASSERT(readResult->Record.HasSnapshot()); + UNIT_ASSERT(!readResult->Record.HasSnapshot()); CheckResult(helper.Tables["table-1"].UserTable, *readResult, { {3, 3, 3, 300}, }); } - Y_UNIT_TEST_TWIN(ShouldProperlyOrderConflictingTransactionsMvcc, UseNewEngine) { - // 1. Start read-write multishard transaction: readset will be blocked - // to hang transaction. Write is the key we want to read. - // 2a. Check that we can read prior blocked step. - // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait - // for the hanging transaction. - // 3. Finish hanging write. - // 4. MVCC read must finish, do another MVCC read of same version for sanity check - // that read is repeatable. - // 5. Read prior data again + Y_UNIT_TEST(ShouldReadFromHeadWithConflict) { + // Similar to ShouldReadFromHead, but there is conflicting hanged operation. + // We will read all at once thus should not block TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetEnableMvcc(true) - .SetEnableKqpSessionActor(UseNewEngine) .SetUseRealThreads(false); const ui64 shardCount = 1; TTestHelper helper(serverSettings, shardCount); - const auto& sender = helper.Sender; + auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount, false); + + { + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max()); + request->Record.ClearSnapshot(); + AddKeyQuery(*request, {3, 3, 3}); + AddKeyQuery(*request, {1, 1, 1}); + AddKeyQuery(*request, {5, 5, 5}); + + auto readResult = helper.SendRead( + "table-1", + request.release(), + 0, + helper.Sender, + TDuration::MilliSeconds(100)); + UNIT_ASSERT(readResult); // read is not blocked by conflicts! + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + UNIT_ASSERT(!record.HasSnapshot()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 300}, + {1, 1, 1, 100}, + {5, 5, 5, 500} + }); + } + + // Don't catch RS any more and send caught ones to proceed with upserts. auto& runtime = *helper.Server->GetRuntime(); - runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::KQP_PROXY, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG); + runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc); + for (auto &rs : hangedInfo.ReadSets) + runtime.Send(rs.Release()); - CreateTable(helper.Server, sender, "/Root", "table-2", false, shardCount); - ExecSQL(helper.Server, sender, R"( - UPSERT INTO `/Root/table-2` - (key1, key2, key3, value) - VALUES - (1, 1, 1, 1000), - (3, 3, 3, 3000), - (5, 5, 5, 5000), - (8, 0, 0, 8000), - (8, 0, 1, 8010), - (8, 1, 0, 8020), - (8, 1, 1, 8030), - (11, 11, 11, 11110); - )"); + // Wait for upsert to finish. + { + TDispatchOptions options; + options.FinalEvents.emplace_back(IsTxResultComplete(), 1); + runtime.DispatchEvents(options); + } + } - auto waitFor = [&](const auto& condition, const TString& description) { - if (!condition()) { - Cerr << "... waiting for " << description << Endl; - TDispatchOptions options; - options.CustomFinalCondition = [&]() { - return condition(); - }; - helper.Server->GetRuntime()->DispatchEvents(options); - UNIT_ASSERT_C(condition(), "... failed to wait for " << description); - } - }; + Y_UNIT_TEST(ShouldReadFromHeadToMvccWithConflict) { + // Similar to ShouldProperlyOrderConflictingTransactionsMvcc, but we read HEAD + // + // In this test HEAD read waits conflicting transaction: first time we read from HEAD and + // notice that result it not full. Then restart after conflicting operation finishes - bool capturePlanStep = true; - bool dropRS = true; + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetUseRealThreads(false); - ui64 lastPlanStep = 0; - TVector<THolder<IEventHandle>> readSets; + const ui64 shardCount = 1; + TTestHelper helper(serverSettings, shardCount); - auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { - switch (event->GetTypeRewrite()) { - case TEvTxProcessing::EvPlanStep: { - if (capturePlanStep) { - auto planMessage = event->Get<TEvTxProcessing::TEvPlanStep>(); - lastPlanStep = planMessage->Record.GetStep(); - } - break; - } - case TEvTxProcessing::EvReadSet: { - if (dropRS) { - readSets.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - } - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents); + auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount, false); - capturePlanStep = true; + { + // now read HEAD + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, TRowVersion::Max()); + request->Record.ClearSnapshot(); + AddKeyQuery(*request, {3, 3, 3}); + AddKeyQuery(*request, {1, 1, 1}); + AddKeyQuery(*request, {3, 3, 3}); + AddKeyQuery(*request, {1, 1, 1}); + AddKeyQuery(*request, {5, 5, 5}); + AddKeyQuery(*request, {11, 11, 11}); - // Send SQL request which should hang due to lost RS - // We will capture its planstep - SendSQL( - helper.Server, - sender, - Q_("UPSERT INTO `/Root/table-1` (key1, key2, key3, value) SELECT key1, key2, key3, value FROM `/Root/table-2`")); + // intentionally 2: we check that between Read restart Reader's state is reset. + // Because of implementation we always read 1 + request->Record.SetMaxRowsInResult(2); - waitFor([&]{ return lastPlanStep != 0; }, "intercepted TEvPlanStep"); - capturePlanStep = false; - const auto hangedStep = lastPlanStep; + auto readResult = helper.SendRead( + "table-1", + request.release(), + 0, + helper.Sender, + TDuration::MilliSeconds(100)); + UNIT_ASSERT(!readResult); // read is blocked by conflicts + } + + // Don't catch RS any more and send caught ones to proceed with upserts. + auto& runtime = *helper.Server->GetRuntime(); + runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc); + for (auto &rs : hangedInfo.ReadSets) + runtime.Send(rs.Release()); + + // Wait for upsert to finish. + { + TDispatchOptions options; + options.FinalEvents.emplace_back(IsTxResultComplete(), 1); + runtime.DispatchEvents(options); + } - // With mvcc (or a better dependency tracking) the read below may start out-of-order, - // because transactions above are stuck before performing any writes. Make sure it's - // forced to wait for above transactions by commiting a write that is guaranteed - // to "happen" after transactions above. - SendSQL(helper.Server, sender, Q_((R"( - UPSERT INTO `/Root/table-1` (key1, key2, key3, value) VALUES (11, 11, 11, 11234); - UPSERT INTO `/Root/table-2` (key1, key2, key3, value) VALUES (11, 11, 11, 112345); - )"))); + { + // get1 + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(!record.HasFinished()); + UNIT_ASSERT(record.HasSnapshot()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 3000}, + {1, 1, 1, 1000} + }); + } - waitFor([&]{ return readSets.size() == 1; }, "intercepted RS"); + { + // get2 + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(!record.HasFinished()); + UNIT_ASSERT(record.HasSnapshot()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {3, 3, 3, 3000}, + {1, 1, 1, 1000} + }); + } + + { + // get3 + auto readResult = helper.WaitReadResult(); + const auto& record = readResult->Record; + UNIT_ASSERT(record.HasFinished()); + UNIT_ASSERT(record.HasSnapshot()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {5, 5, 5, 5000}, + {11, 11, 11, 11110} + }); + } + } + + Y_UNIT_TEST(ShouldProperlyOrderConflictingTransactionsMvcc) { + // 1. Start read-write multishard transaction: readset will be blocked + // to hang transaction. Write is the key we want to read. + // 2a. Check that we can read prior blocked step. + // 2b. Do MVCC read of the key, which hanging transaction tries to write. MVCC must wait + // for the hanging transaction. + // 3. Finish hanging write. + // 4. MVCC read must finish, do another MVCC read of same version for sanity check + // that read is repeatable. + // 5. Read prior data again + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetUseRealThreads(false); + + const ui64 shardCount = 1; + TTestHelper helper(serverSettings, shardCount); + + auto hangedInfo = helper.HangWithTransactionWaitingRS(shardCount); + auto hangedStep = hangedInfo.LastPlanStep; // 2a: read prior data { @@ -1825,8 +1980,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { } // 3. Don't catch RS any more and send caught ones to proceed with upserts. + auto& runtime = *helper.Server->GetRuntime(); runtime.SetObserverFunc(&TTestActorRuntime::DefaultObserverFunc); - for (auto &rs : readSets) + for (auto &rs : hangedInfo.ReadSets) runtime.Send(rs.Release()); // Wait for upserts and immediate tx to finish. @@ -2506,7 +2662,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { - NDataShard::TReadIteratorState state({}); + NDataShard::TReadIteratorState state({}, false); state.Quota.Rows = 100; state.Quota.Bytes = 1000; state.ConsumeSeqNo(10, 100); // seqno1 diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 6a0692880fd..dcd4eb6eb94 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -64,8 +64,9 @@ struct TReadIteratorState { }; public: - explicit TReadIteratorState(const TActorId& sessionId) - : SessionId(sessionId) + TReadIteratorState(const TActorId& sessionId, bool isHeadRead) + : IsHeadRead(isHeadRead) + , SessionId(sessionId) {} bool IsExhausted() const { return State == EState::Exhausted; } @@ -155,6 +156,7 @@ public: TPathId PathId; std::vector<NTable::TTag> Columns; TRowVersion ReadVersion = TRowVersion::Max(); + bool IsHeadRead = false; ui64 LockTxId = 0; TLockInfo::TPtr Lock; bool ReportedLockBroken = false; diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp index c51f1f82868..5736675acc9 100644 --- a/ydb/core/tx/datashard/read_op_unit.cpp +++ b/ydb/core/tx/datashard/read_op_unit.cpp @@ -14,23 +14,23 @@ public: ~TReadUnit() = default; bool IsReadyToExecute(TOperation::TPtr op) const override { - return !op->HasRuntimeConflicts(); + return !op->HasRuntimeConflicts() && op->GetDependencies().empty(); } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { IReadOperation* readOperation = dynamic_cast<IReadOperation*>(op.Get()); Y_VERIFY(readOperation); - if (!readOperation->Execute(txc, ctx)) { - return EExecutionStatus::Restart; - } + auto status = readOperation->Execute(txc, ctx); + if (status == EExecutionStatus::Restart || status == EExecutionStatus::Continue) + return status; // TODO: check if we can send result right here to decrease latency // note that op has set locks itself, no ApplyLocks() required DataShard.SubscribeNewLocks(ctx); - return EExecutionStatus::DelayCompleteNoMoreRestarts; + return status; } void Complete(TOperation::TPtr op, const TActorContext& ctx) override { |