aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-01-31 17:08:50 +0300
committerGitHub <noreply@github.com>2024-01-31 17:08:50 +0300
commita2d8bb5e0dcad006cde22d2f629c4c22fa729eae (patch)
tree25e207e489bb39458635a73825197ea37ca61034
parent736e7ff03b96618cbfcc0556cc9d7a88a4d8af82 (diff)
downloadydb-a2d8bb5e0dcad006cde22d2f629c4c22fa729eae.tar.gz
Fix cdc heartbeats reporting unconfirmed volatile transactions as resolved KIKIMR-20962 (#1473)
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/cdc_stream_heartbeat.cpp28
-rw-r--r--ydb/core/tx/datashard/complete_data_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/create_cdc_stream_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp109
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/finish_propose_write_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp2
-rw-r--r--ydb/core/tx/datashard/write_unit.cpp2
13 files changed, 136 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index 3f47905e976..26aa1f47b49 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -88,8 +88,8 @@ public:
return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
- void Complete(TOperation::TPtr, const TActorContext& ctx) override {
- DataShard.EmitHeartbeats(ctx);
+ void Complete(TOperation::TPtr, const TActorContext&) override {
+ DataShard.EmitHeartbeats();
}
};
diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
index 0e9ccbec844..0560136f72a 100644
--- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
@@ -1,9 +1,9 @@
#include "cdc_stream_heartbeat.h"
#include "datashard_impl.h"
-#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
-#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
-#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream)
namespace NKikimr::NDataShard {
@@ -32,7 +32,7 @@ public:
TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; }
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
LOG_I("Emit change records"
<< ": edge# " << Edge
<< ", at tablet# " << Self->TabletID());
@@ -69,16 +69,16 @@ public:
return true;
}
- void Complete(const TActorContext& ctx) override {
+ void Complete(const TActorContext&) override {
LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
<< ": at tablet# " << Self->TabletID());
Self->EnqueueChangeRecords(std::move(ChangeRecords));
- Self->EmitHeartbeats(ctx);
+ Self->EmitHeartbeats();
}
}; // TTxCdcStreamEmitHeartbeats
-void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
+void TDataShard::EmitHeartbeats() {
LOG_D("Emit heartbeats"
<< ": at tablet# " << TabletID());
@@ -92,15 +92,23 @@ void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
}
if (const auto& plan = TransQueue.GetPlan()) {
- const auto version = plan.begin()->ToRowVersion();
+ const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
- return Execute(new TTxCdcStreamEmitHeartbeats(this, version), ctx);
+ return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
+ return;
+ }
+
+ if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
+ if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
+ return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
+ }
+ return;
}
const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
- return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite), ctx);
+ return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
}
WaitPlanStep(lowest.Next().Step);
diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
index a078bf487c3..1055068cdfe 100644
--- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
@@ -122,7 +122,7 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op,
DataShard.NotifySchemeshard(ctx, op->GetTxId());
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
if (op->HasOutputData()) {
const auto& outReadSets = op->OutReadSets();
diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
index ef5433e8aba..0bbf0261f5d 100644
--- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
@@ -77,7 +77,7 @@ public:
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
if (AddSender) {
ctx.Send(DataShard.GetChangeSender(), AddSender.Release());
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
}
}
};
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 5433ad33c9f..f71f7231e51 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2289,7 +2289,7 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
PromoteFollowerReadEdge();
}
- EmitHeartbeats(ctx);
+ EmitHeartbeats();
}
void TDataShard::CheckMediatorStateRestored() {
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index 93fb4ce452a..b00bb254ecd 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -115,7 +115,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) {
Self->CreateChangeSender(ctx);
Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->MaybeActivateChangeSender(ctx);
- Self->EmitHeartbeats(ctx);
+ Self->EmitHeartbeats();
if (!Self->ChangesQueue) {
if (!Self->ChangeExchangeSplitter.Done()) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index bc3e8a3e433..7720260dce6 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1877,7 +1877,7 @@ public:
TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() { return CdcStreamHeartbeatManager; }
const TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() const { return CdcStreamHeartbeatManager; }
- void EmitHeartbeats(const TActorContext& ctx);
+ void EmitHeartbeats();
template <typename... Args>
bool PromoteCompleteEdge(Args&&... args) {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 97576e84815..601a8c579ec 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1,4 +1,5 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
+#include "datashard_ut_common_kqp.h"
#include <ydb/core/base/path.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
@@ -22,6 +23,7 @@
namespace NKikimr {
using namespace NDataShard;
+using namespace NDataShard::NKqpHelpers;
using namespace Tests;
Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
@@ -1864,11 +1866,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
while (true) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
- if (records.size() == expected.size()) {
- for (ui32 i = 0; i < expected.size(); ++i) {
- AssertJsonsEqual(records.at(i).second, expected.at(i));
- }
+ for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
+ AssertJsonsEqual(records.at(i).second, expected.at(i));
+ }
+ if (records.size() >= expected.size()) {
+ UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
+ "Unexpected record: " << records.at(expected.size()).second);
break;
}
@@ -3157,6 +3161,103 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(ResolvedTimestampsVolatileOutOfOrder) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableDataShardVolatileTransactions(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable());
+ CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable());
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table1",
+ WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table2",
+ WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
+ R"({"resolved":"***"})",
+ });
+ WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
+ R"({"resolved":"***"})",
+ });
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table1` (key, value) VALUES (1, 10);
+ UPSERT INTO `/Root/Table2` (key, value) VALUES (2, 20);
+ )");
+
+ WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"resolved":"***"})",
+ });
+ WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"resolved":"***"})",
+ });
+
+ // Block readset exchange
+ std::vector<std::unique_ptr<IEventHandle>> readSets;
+ auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
+ readSets.emplace_back(ev.Release());
+ });
+
+ // Start a distributed write to both tables
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto upsertResult = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/Table1` (key, value) VALUES (3, 30);
+ UPSERT INTO `/Root/Table2` (key, value) VALUES (4, 40);
+ )", sessionId, /* txId */ "", /* commitTx */ true),
+ "/Root");
+ WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");
+
+ // Stop blocking further readsets
+ blockReadSets.Remove();
+
+ // Start another distributed write to both tables, it should succeed
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table1` (key, value) VALUES (5, 50);
+ UPSERT INTO `/Root/Table2` (key, value) VALUES (6, 60);
+ )");
+
+ runtime.SimulateSleep(TDuration::Seconds(10));
+
+ // Unblock readsets
+ for (auto& ev : readSets) {
+ runtime.Send(ev.release(), 0, true);
+ }
+ readSets.clear();
+
+ // There should be only one resolved timestamp after out of order writes
+ WaitForContent(server, edgeActor, "/Root/Table1/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"resolved":"***"})",
+ R"({"update":{"value":50},"key":[5]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"resolved":"***"})",
+ });
+ WaitForContent(server, edgeActor, "/Root/Table2/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"resolved":"***"})",
+ R"({"update":{"value":60},"key":[6]})",
+ R"({"update":{"value":40},"key":[4]})",
+ R"({"resolved":"***"})",
+ });
+ }
+
Y_UNIT_TEST(SequentialSplitMerge) {
TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1);
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index 11e394800cb..b47187c5daa 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -80,7 +80,7 @@ public:
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
Pipeline.RemoveCommittingOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_ABORT_UNLESS(tx != nullptr);
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index 2360bdc4004..7c30cb96009 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -141,7 +141,7 @@ void TFinishProposeUnit::Complete(TOperation::TPtr op,
Pipeline.RemoveActiveOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
}
DataShard.SendRegistrationRequestTimeCast(ctx);
diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp
index 413f09e0716..d6c9dffa10f 100644
--- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp
@@ -140,7 +140,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
Pipeline.RemoveActiveOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
}
DataShard.SendRegistrationRequestTimeCast(ctx);
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 35af1fd255f..edd5ca149d6 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -610,6 +610,8 @@ namespace NKikimr::NDataShard {
Self->PromoteFollowerReadEdge();
}
+ Self->EmitHeartbeats();
+
if (!WaitingSnapshotEvents.empty()) {
TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
while (!WaitingSnapshotEvents.empty()) {
diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp
index c469d75b0fd..15d4065b279 100644
--- a/ydb/core/tx/datashard/write_unit.cpp
+++ b/ydb/core/tx/datashard/write_unit.cpp
@@ -182,7 +182,7 @@ public:
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
Pipeline.RemoveCommittingOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
- DataShard.EmitHeartbeats(ctx);
+ DataShard.EmitHeartbeats();
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);