aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-06-06 14:52:35 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:52:35 +0300
commit076884749756a6f2aed1985190ea4083bc3546c1 (patch)
tree45ee6f1eb8f7ce1ed3009fc210f823b891c3bdc1
parent3a8539ebcf7796a7127103da89a17ef2892dc6bb (diff)
downloadydb-076884749756a6f2aed1985190ea4083bc3546c1.tar.gz
22-2: Make sure to garbage collect trailing immediate writes in the last plan step, KIKIMR-14849
Merge from trunk: r9427621 REVIEW: 2524577 x-ydb-stable-ref: ef7ce3fe8f00d04bf19d1aa9b82964785e1bfb78
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h8
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_split_dst.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp306
-rw-r--r--ydb/core/tx/datashard/receive_snapshot_unit.cpp3
8 files changed, 352 insertions, 5 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index d2af64f4d3..001c68d222 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -288,6 +288,8 @@ message TSnapshotTransferReadSet {
optional uint64 MvccIncompleteEdgeTxId = 7;
optional uint64 MvccLowWatermarkStep = 8;
optional uint64 MvccLowWatermarkTxId = 9;
+ optional uint64 MvccImmediateWriteEdgeStep = 10;
+ optional uint64 MvccImmediateWriteEdgeTxId = 11;
}
message TSnapshotTransferInfo {
@@ -691,6 +693,10 @@ message TEvSplitTransferSnapshot {
// Number of bytes that are in the ReplicationSourceOffsets table
optional uint64 ReplicationSourceOffsetsBytes = 16;
+
+ optional uint64 MvccImmediateWriteEdgeStep = 17;
+ optional uint64 MvccImmediateWriteEdgeTxId = 18;
+ optional bool MvccPerformedUnprotectedReads = 19;
}
message TEvSplitTransferSnapshotAck {
diff --git a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
index 7f63253157..8aef437313 100644
--- a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
@@ -79,10 +79,12 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
: TRowVersion::Min();
TRowVersion incompleteEdge = mvcc ? DataShard.GetSnapshotManager().GetIncompleteEdge()
: TRowVersion::Min();
+ TRowVersion immediateWriteEdge = mvcc ? DataShard.GetSnapshotManager().GetImmediateWriteEdge()
+ : TRowVersion::Min();
TRowVersion lowWatermark = mvcc ? DataShard.GetSnapshotManager().GetLowWatermark() :
TRowVersion::Min();
- if (minVersion || completeEdge || incompleteEdge || lowWatermark)
+ if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark)
extended = true; // Must use an extended format
if (extended) {
@@ -110,6 +112,11 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
rs.SetMvccIncompleteEdgeTxId(incompleteEdge.TxId);
}
+ if (immediateWriteEdge) {
+ rs.SetMvccImmediateWriteEdgeStep(immediateWriteEdge.Step);
+ rs.SetMvccImmediateWriteEdgeTxId(immediateWriteEdge.TxId);
+ }
+
if (lowWatermark) {
rs.SetMvccLowWatermarkStep(lowWatermark.Step);
rs.SetMvccLowWatermarkTxId(lowWatermark.TxId);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2a4991939e..0a3100d84d 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -82,11 +82,15 @@ public:
TSplitSnapshotContext(ui64 txId, TVector<ui32> &&tables,
TRowVersion completeEdge = TRowVersion::Min(),
TRowVersion incompleteEdge = TRowVersion::Min(),
- TRowVersion lowWatermark = TRowVersion::Min())
+ TRowVersion immediateWriteEdge = TRowVersion::Min(),
+ TRowVersion lowWatermark = TRowVersion::Min(),
+ bool performedUnprotectedReads = false)
: TxId(txId)
, CompleteEdge(completeEdge)
, IncompleteEdge(incompleteEdge)
+ , ImmediateWriteEdge(immediateWriteEdge)
, LowWatermark(lowWatermark)
+ , PerformedUnprotectedReads(performedUnprotectedReads)
, Tables(tables)
{}
@@ -97,7 +101,9 @@ public:
ui64 TxId;
TRowVersion CompleteEdge;
TRowVersion IncompleteEdge;
+ TRowVersion ImmediateWriteEdge;
TRowVersion LowWatermark;
+ bool PerformedUnprotectedReads;
private:
TVector<ui32> Tables;
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index f05563f5a1..f0a3680435 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -638,7 +638,7 @@ TDuration TSnapshotManager::CleanupTimeout() const {
snapshotTimeout = snapshot->ExpireTime - now;
if (IsMvccEnabled()) {
- if (LowWatermark == CompleteEdge) {
+ if (LowWatermark >= CompleteEdge) {
mvccGcTimeout = GetCleanupSnapshotPeriod();
} else {
mvccGcTimeout = TInstant::MilliSeconds(LowWatermark.Step + GetKeepSnapshotTimeout() + 1) - now;
@@ -682,6 +682,7 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext&
ui64 keepSnapshotTimeout = GetKeepSnapshotTimeout();
TRowVersion proposed = TRowVersion(Max(now.MilliSeconds(), keepSnapshotTimeout) - keepSnapshotTimeout, 0);
+
TRowVersion leastPlanned = TRowVersion::Max();
if (auto it = Self->Pipeline.GetPlan().begin(); it != Self->Pipeline.GetPlan().end())
leastPlanned = TRowVersion(it->Step, it->TxId);
@@ -697,7 +698,14 @@ bool TSnapshotManager::RemoveExpiredSnapshots(TInstant now, TTransactionContext&
PromoteCompleteEdge(ImmediateWriteEdgeReplied.Step, txc);
}
- removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, CompleteEdge));
+ // Calculate the maximum version where we may have written something
+ // Cleaning beyond this point would be a waste of log bandwidth
+ TRowVersion maxWriteVersion(CompleteEdge.Step, Max<ui64>());
+ if (maxWriteVersion < ImmediateWriteEdge) {
+ maxWriteVersion = ImmediateWriteEdge;
+ }
+
+ removed |= AdvanceWatermark(txc.DB, Min(proposed, leastPlanned, leastAcquired, maxWriteVersion));
LastAdvanceWatermark = NActors::TActivationContext::Monotonic();
return removed;
diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp
index a1df775fbe..6fb7ffda02 100644
--- a/ydb/core/tx/datashard/datashard_split_dst.cpp
+++ b/ydb/core/tx/datashard/datashard_split_dst.cpp
@@ -198,9 +198,15 @@ public:
TRowVersion incompleteEdge(record.GetMvccIncompleteEdgeStep(), record.GetMvccIncompleteEdgeTxId());
if (Self->GetSnapshotManager().GetIncompleteEdge() < incompleteEdge)
Self->GetSnapshotManager().SetIncompleteEdge(db, incompleteEdge);
+ TRowVersion immediateWriteEdge(record.GetMvccImmediateWriteEdgeStep(), record.GetMvccImmediateWriteEdgeTxId());
+ if (Self->GetSnapshotManager().GetImmediateWriteEdge() < immediateWriteEdge)
+ Self->GetSnapshotManager().SetImmediateWriteEdge(immediateWriteEdge, txc);
TRowVersion lowWatermark(record.GetMvccLowWatermarkStep(), record.GetMvccLowWatermarkTxId());
if (Self->GetSnapshotManager().GetLowWatermark() < lowWatermark)
Self->GetSnapshotManager().SetLowWatermark(db, lowWatermark);
+ bool performedUnprotectedReads = record.GetMvccPerformedUnprotectedReads();
+ if (!Self->GetSnapshotManager().GetPerformedUnprotectedReads() && performedUnprotectedReads)
+ Self->GetSnapshotManager().SetPerformedUnprotectedReads(true, txc);
}
// Would be true for the first snapshot we receive, e.g. during a merge
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index a9ccd68001..c9efd503ec 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -178,7 +178,9 @@ public:
snapContext = new TSplitSnapshotContext(opId, std::move(tablesToSnapshot),
Self->GetSnapshotManager().GetCompleteEdge(),
Self->GetSnapshotManager().GetIncompleteEdge(),
- Self->GetSnapshotManager().GetLowWatermark());
+ Self->GetSnapshotManager().GetImmediateWriteEdge(),
+ Self->GetSnapshotManager().GetLowWatermark(),
+ Self->GetSnapshotManager().GetPerformedUnprotectedReads());
} else {
snapContext = new TSplitSnapshotContext(opId, std::move(tablesToSnapshot));
}
@@ -323,6 +325,9 @@ public:
snapshot->SetMvccCompleteEdgeTxId(SnapContext->CompleteEdge.TxId);
snapshot->SetMvccIncompleteEdgeStep(SnapContext->IncompleteEdge.Step);
snapshot->SetMvccIncompleteEdgeTxId(SnapContext->IncompleteEdge.TxId);
+ snapshot->SetMvccImmediateWriteEdgeStep(SnapContext->ImmediateWriteEdge.Step);
+ snapshot->SetMvccImmediateWriteEdgeTxId(SnapContext->ImmediateWriteEdge.TxId);
+ snapshot->SetMvccPerformedUnprotectedReads(SnapContext->PerformedUnprotectedReads);
}
// Send info about existing persistent snapshots
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 1182c04b33..74b486af39 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1,12 +1,16 @@
#include "datashard_ut_common.h"
+#include "datashard_ut_common_kqp.h"
#include "datashard_active_transaction.h"
#include <ydb/core/formats/factory.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
+
namespace NKikimr {
using namespace NKikimr::NDataShard;
+using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;
@@ -970,6 +974,308 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"key = 3, value = 3\n");
}
+ Y_UNIT_TEST_TWIN(MvccSnapshotTailCleanup, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetKeepSnapshotTimeout(TDuration::Seconds(2))
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ // Start with a snapshot read that persists necessary flags and advances edges for the first time
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ // Create a new snapshot, it should still observe the same state
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Insert a new row and wait for result, this will roll over into a new step
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
+
+ bool failed = false;
+ for (int i = 0; i < 5; ++i) {
+ // Idle cleanup is roughly every 15 seconds
+ SimulateSleep(runtime, TDuration::Seconds(15));
+ auto result = continueSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )"));
+ if (result.StartsWith("ERROR:")) {
+ Cerr << "... got expected failure: " << result << Endl;
+ failed = true;
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(
+ result,
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ }
+
+ UNIT_ASSERT_C(failed, "Snapshot was not cleaned up");
+ }
+
+ Y_UNIT_TEST_TWIN(MvccSnapshotAndSplit, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ // Start with a snapshot read that persists necessary flags and advances edges for the first time
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ SimulateSleep(runtime, TDuration::Seconds(2));
+
+ bool captureSplit = true;
+ bool captureTimecast = false;
+ TVector<THolder<IEventHandle>> capturedSplit;
+ TVector<THolder<IEventHandle>> capturedTimecast;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvSplit::EventType: {
+ if (captureSplit) {
+ Cerr << "... captured TEvSplit" << Endl;
+ capturedSplit.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvMediatorTimecast::EvUpdate: {
+ auto update = ev->Get<TEvMediatorTimecast::TEvUpdate>();
+ auto lastStep = update->Record.GetTimeBarrier();
+ if (captureTimecast) {
+ Cerr << "... captured TEvUpdate with step " << lastStep << Endl;
+ capturedTimecast.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ } else {
+ Cerr << "... observed TEvUpdate with step " << lastStep << Endl;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ // Split would fail otherwise :(
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+
+ // Start splitting table into two shards
+ auto senderSplit = runtime.AllocateEdgeActor();
+ auto tablets = GetTableShards(server, senderSplit, "/Root/table-1");
+ auto splitTxId = AsyncSplitTable(server, senderSplit, "/Root/table-1", tablets.at(0), 4);
+
+ // Wait until schemeshard wants to split the source shard
+ waitFor([&]{ return capturedSplit.size() > 0; }, "captured split");
+
+ // Create a new snapshot and verify initial state
+ // This snapshot must be lightweight and must not advance any edges
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Finish the split
+ captureSplit = false;
+ captureTimecast = true;
+ for (auto& ev : capturedSplit) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ WaitTxNotification(server, senderSplit, splitTxId);
+
+ // Send an immediate write after the finished split
+ // In a buggy case it starts executing despite a blocked timecast
+ auto senderImmediateWrite = runtime.AllocateEdgeActor();
+ SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)
+ )")));
+
+ // We sleep a little so datashard commits changes in buggy case
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Unblock timecast, so datashard time can finally catch up
+ captureTimecast = false;
+
+ // Wait for the commit result
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // Snapshot must not have been damaged by the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // But new immediate read must observe all writes we have performed
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key in (1, 2, 3)
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/receive_snapshot_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_unit.cpp
index 26f92f65f4..427a136b34 100644
--- a/ydb/core/tx/datashard/receive_snapshot_unit.cpp
+++ b/ydb/core/tx/datashard/receive_snapshot_unit.cpp
@@ -83,6 +83,9 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op,
TRowVersion incompleteEdge(rs.GetMvccIncompleteEdgeStep(), rs.GetMvccIncompleteEdgeTxId());
if (DataShard.GetSnapshotManager().GetIncompleteEdge() < incompleteEdge)
DataShard.GetSnapshotManager().SetIncompleteEdge(db, incompleteEdge);
+ TRowVersion immediateWriteEdge(rs.GetMvccImmediateWriteEdgeStep(), rs.GetMvccImmediateWriteEdgeTxId());
+ if (DataShard.GetSnapshotManager().GetImmediateWriteEdge() < immediateWriteEdge)
+ DataShard.GetSnapshotManager().SetImmediateWriteEdge(immediateWriteEdge, txc);
TRowVersion lowWatermark(rs.GetMvccLowWatermarkStep(), rs.GetMvccLowWatermarkTxId());
if (DataShard.GetSnapshotManager().GetLowWatermark() < lowWatermark)
DataShard.GetSnapshotManager().SetLowWatermark(db, lowWatermark);