diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-06-06 14:52:35 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:52:35 +0300 |
commit | 076884749756a6f2aed1985190ea4083bc3546c1 (patch) | |
tree | 45ee6f1eb8f7ce1ed3009fc210f823b891c3bdc1 | |
parent | 3a8539ebcf7796a7127103da89a17ef2892dc6bb (diff) | |
download | ydb-076884749756a6f2aed1985190ea4083bc3546c1.tar.gz |
22-2: Make sure to garbage collect trailing immediate writes in the last plan step, KIKIMR-14849
Merge from trunk: r9427621
REVIEW: 2524577
x-ydb-stable-ref: ef7ce3fe8f00d04bf19d1aa9b82964785e1bfb78
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_snapshots.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_dst.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_src.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 306 | ||||
-rw-r--r-- | ydb/core/tx/datashard/receive_snapshot_unit.cpp | 3 |
8 files changed, 352 insertions, 5 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index d2af64f4d3..001c68d222 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -288,6 +288,8 @@ message TSnapshotTransferReadSet { optional uint64 MvccIncompleteEdgeTxId = 7; optional uint64 MvccLowWatermarkStep = 8; optional uint64 MvccLowWatermarkTxId = 9; + optional uint64 MvccImmediateWriteEdgeStep = 10; + optional uint64 MvccImmediateWriteEdgeTxId = 11; } message TSnapshotTransferInfo { @@ -691,6 +693,10 @@ message TEvSplitTransferSnapshot { // Number of bytes that are in the ReplicationSourceOffsets table optional uint64 ReplicationSourceOffsetsBytes = 16; + + optional uint64 MvccImmediateWriteEdgeStep = 17; + optional uint64 MvccImmediateWriteEdgeTxId = 18; + optional bool MvccPerformedUnprotectedReads = 19; } message TEvSplitTransferSnapshotAck { diff --git a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp index 7f63253157..8aef437313 100644 --- a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp @@ -79,10 +79,12 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op, : TRowVersion::Min(); TRowVersion incompleteEdge = mvcc ? DataShard.GetSnapshotManager().GetIncompleteEdge() : TRowVersion::Min(); + TRowVersion immediateWriteEdge = mvcc ? DataShard.GetSnapshotManager().GetImmediateWriteEdge() + : TRowVersion::Min(); TRowVersion lowWatermark = mvcc ? DataShard.GetSnapshotManager().GetLowWatermark() : TRowVersion::Min(); - if (minVersion || completeEdge || incompleteEdge || lowWatermark) + if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark) extended = true; // Must use an extended format if (extended) { @@ -110,6 +112,11 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op, rs.SetMvccIncompleteEdgeTxId(incompleteEdge.TxId); } + if (immediateWriteEdge) { + rs.SetMvccImmediateWriteEdgeStep(immediateWriteEdge.Step); + rs.SetMvccImmediateWriteEdgeTxId(immediateWriteEdge.TxId); + } + if (lowWatermark) { rs.SetMvccLowWatermarkStep(lowWatermark.Step); rs.SetMvccLowWatermarkTxId(lowWatermark.TxId); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2a4991939e..0a3100d84d 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -82,11 +82,15 @@ public: TSplitSnapshotContext(ui64 txId, TVector<ui32> &&tables, TRowVersion completeEdge = TRowVersion::Min(), TRowVersion incompleteEdge = TRowVersion::Min(), - TRowVersion lowWatermark = TRowVersion::Min()) + TRowVersion immediateWriteEdge = TRowVersion::Min(), + TRowVersion lowWatermark = TRowVersion::Min(), + bool performedUnprotectedReads = false) : TxId(txId) , CompleteEdge(completeEdge) , IncompleteEdge(incompleteEdge) + , ImmediateWriteEdge(immediateWriteEdge) , LowWatermark(lowWatermark) + , PerformedUnprotectedReads(performedUnprotectedReads) , Tables(tables) {} @@ -97,7 +101,9 @@ public: ui64 TxId; TRowVersion CompleteEdge; TRowVersion IncompleteEdge; + TRowVersion ImmediateWriteEdge; TRowVersion LowWatermark; + bool PerformedUnprotectedReads; private: TVector<ui32> Tables; diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index f05563f5a1..f0a3680435 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -638,7 +638,7 @@ TDuration TSnapshotManager::CleanupTimeout() const { snapshotTimeout = snapshot->ExpireTime - now; if (IsMvccEnabled()) { - if (LowWatermark == CompleteEdge) { + if (LowWatermark >= CompleteEdge) { mvccGcTimeout = GetCleanupSnapshotPeriod(); } else { mvccGcTimeout = TInstant::MilliSeconds(LowWatermark.Step + GetKeepSnapshotTimeout() + 1) - now; @@ -682,6 +682,7 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext& ui64 keepSnapshotTimeout = GetKeepSnapshotTimeout(); TRowVersion proposed = TRowVersion(Max(now.MilliSeconds(), keepSnapshotTimeout) - keepSnapshotTimeout, 0); + TRowVersion leastPlanned = TRowVersion::Max(); if (auto it = Self->Pipeline.GetPlan().begin(); it != Self->Pipeline.GetPlan().end()) leastPlanned = TRowVersion(it->Step, it->TxId); @@ -697,7 +698,14 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext& PromoteCompleteEdge(ImmediateWriteEdgeReplied.Step, txc); } - removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, CompleteEdge)); + // Calculate the maximum version where we may have written something + // Cleaning beyond this point would be a waste of log bandwidth + TRowVersion maxWriteVersion(CompleteEdge.Step, Max<ui64>()); + if (maxWriteVersion < ImmediateWriteEdge) { + maxWriteVersion = ImmediateWriteEdge; + } + + removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion)); LastAdvanceWatermark = NActors::TActivationContext::Monotonic(); return removed; diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index a1df775fbe..6fb7ffda02 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -198,9 +198,15 @@ public: TRowVersion incompleteEdge(record.GetMvccIncompleteEdgeStep(), record.GetMvccIncompleteEdgeTxId()); if (Self->GetSnapshotManager().GetIncompleteEdge() < incompleteEdge) Self->GetSnapshotManager().SetIncompleteEdge(db, incompleteEdge); + TRowVersion immediateWriteEdge(record.GetMvccImmediateWriteEdgeStep(), record.GetMvccImmediateWriteEdgeTxId()); + if (Self->GetSnapshotManager().GetImmediateWriteEdge() < immediateWriteEdge) + Self->GetSnapshotManager().SetImmediateWriteEdge(immediateWriteEdge, txc); TRowVersion lowWatermark(record.GetMvccLowWatermarkStep(), record.GetMvccLowWatermarkTxId()); if (Self->GetSnapshotManager().GetLowWatermark() < lowWatermark) Self->GetSnapshotManager().SetLowWatermark(db, lowWatermark); + bool performedUnprotectedReads = record.GetMvccPerformedUnprotectedReads(); + if (!Self->GetSnapshotManager().GetPerformedUnprotectedReads() && performedUnprotectedReads) + Self->GetSnapshotManager().SetPerformedUnprotectedReads(true, txc); } // Would be true for the first snapshot we receive, e.g. during a merge diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index a9ccd68001..c9efd503ec 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -178,7 +178,9 @@ public: snapContext = new TSplitSnapshotContext(opId, std::move(tablesToSnapshot), Self->GetSnapshotManager().GetCompleteEdge(), Self->GetSnapshotManager().GetIncompleteEdge(), - Self->GetSnapshotManager().GetLowWatermark()); + Self->GetSnapshotManager().GetImmediateWriteEdge(), + Self->GetSnapshotManager().GetLowWatermark(), + Self->GetSnapshotManager().GetPerformedUnprotectedReads()); } else { snapContext = new TSplitSnapshotContext(opId, std::move(tablesToSnapshot)); } @@ -323,6 +325,9 @@ public: snapshot->SetMvccCompleteEdgeTxId(SnapContext->CompleteEdge.TxId); snapshot->SetMvccIncompleteEdgeStep(SnapContext->IncompleteEdge.Step); snapshot->SetMvccIncompleteEdgeTxId(SnapContext->IncompleteEdge.TxId); + snapshot->SetMvccImmediateWriteEdgeStep(SnapContext->ImmediateWriteEdge.Step); + snapshot->SetMvccImmediateWriteEdgeTxId(SnapContext->ImmediateWriteEdge.TxId); + snapshot->SetMvccPerformedUnprotectedReads(SnapContext->PerformedUnprotectedReads); } // Send info about existing persistent snapshots diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 1182c04b33..74b486af39 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1,12 +1,16 @@ #include "datashard_ut_common.h" +#include "datashard_ut_common_kqp.h" #include "datashard_active_transaction.h" #include <ydb/core/formats/factory.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD) + namespace NKikimr { using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; @@ -970,6 +974,308 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "key = 3, value = 3\n"); } + Y_UNIT_TEST_TWIN(MvccSnapshotTailCleanup, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetKeepSnapshotTimeout(TDuration::Seconds(2)) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + // Start with a snapshot read that persists necessary flags and advances edges for the first time + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + SimulateSleep(runtime, TDuration::Seconds(2)); + + // Create a new snapshot, it should still observe the same state + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Insert a new row and wait for result, this will roll over into a new step + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); + + bool failed = false; + for (int i = 0; i < 5; ++i) { + // Idle cleanup is roughly every 15 seconds + SimulateSleep(runtime, TDuration::Seconds(15)); + auto result = continueSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")); + if (result.StartsWith("ERROR:")) { + Cerr << "... got expected failure: " << result << Endl; + failed = true; + break; + } + UNIT_ASSERT_VALUES_EQUAL( + result, + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + } + + UNIT_ASSERT_C(failed, "Snapshot was not cleaned up"); + } + + Y_UNIT_TEST_TWIN(MvccSnapshotAndSplit, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + // Start with a snapshot read that persists necessary flags and advances edges for the first time + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + SimulateSleep(runtime, TDuration::Seconds(2)); + + bool captureSplit = true; + bool captureTimecast = false; + TVector<THolder<IEventHandle>> capturedSplit; + TVector<THolder<IEventHandle>> capturedTimecast; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvSplit::EventType: { + if (captureSplit) { + Cerr << "... captured TEvSplit" << Endl; + capturedSplit.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvMediatorTimecast::EvUpdate: { + auto update = ev->Get<TEvMediatorTimecast::TEvUpdate>(); + auto lastStep = update->Record.GetTimeBarrier(); + if (captureTimecast) { + Cerr << "... captured TEvUpdate with step " << lastStep << Endl; + capturedTimecast.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } else { + Cerr << "... observed TEvUpdate with step " << lastStep << Endl; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + // Split would fail otherwise :( + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + + // Start splitting table into two shards + auto senderSplit = runtime.AllocateEdgeActor(); + auto tablets = GetTableShards(server, senderSplit, "/Root/table-1"); + auto splitTxId = AsyncSplitTable(server, senderSplit, "/Root/table-1", tablets.at(0), 4); + + // Wait until schemeshard wants to split the source shard + waitFor([&]{ return capturedSplit.size() > 0; }, "captured split"); + + // Create a new snapshot and verify initial state + // This snapshot must be lightweight and must not advance any edges + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Finish the split + captureSplit = false; + captureTimecast = true; + for (auto& ev : capturedSplit) { + runtime.Send(ev.Release(), 0, true); + } + WaitTxNotification(server, senderSplit, splitTxId); + + // Send an immediate write after the finished split + // In a buggy case it starts executing despite a blocked timecast + auto senderImmediateWrite = runtime.AllocateEdgeActor(); + SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2) + )"))); + + // We sleep a little so datashard commits changes in buggy case + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Unblock timecast, so datashard time can finally catch up + captureTimecast = false; + + // Wait for the commit result + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // Snapshot must not have been damaged by the write above + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // But new immediate read must observe all writes we have performed + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key in (1, 2, 3) + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/receive_snapshot_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_unit.cpp index 26f92f65f4..427a136b34 100644 --- a/ydb/core/tx/datashard/receive_snapshot_unit.cpp +++ b/ydb/core/tx/datashard/receive_snapshot_unit.cpp @@ -83,6 +83,9 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op, TRowVersion incompleteEdge(rs.GetMvccIncompleteEdgeStep(), rs.GetMvccIncompleteEdgeTxId()); if (DataShard.GetSnapshotManager().GetIncompleteEdge() < incompleteEdge) DataShard.GetSnapshotManager().SetIncompleteEdge(db, incompleteEdge); + TRowVersion immediateWriteEdge(rs.GetMvccImmediateWriteEdgeStep(), rs.GetMvccImmediateWriteEdgeTxId()); + if (DataShard.GetSnapshotManager().GetImmediateWriteEdge() < immediateWriteEdge) + DataShard.GetSnapshotManager().SetImmediateWriteEdge(immediateWriteEdge, txc); TRowVersion lowWatermark(rs.GetMvccLowWatermarkStep(), rs.GetMvccLowWatermarkTxId()); if (DataShard.GetSnapshotManager().GetLowWatermark() < lowWatermark) DataShard.GetSnapshotManager().SetLowWatermark(db, lowWatermark); |