diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-01-31 17:08:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 17:08:50 +0300 |
commit | a2d8bb5e0dcad006cde22d2f629c4c22fa729eae (patch) | |
tree | 25e207e489bb39458635a73825197ea37ca61034 | |
parent | 736e7ff03b96618cbfcc0556cc9d7a88a4d8af82 (diff) | |
download | ydb-a2d8bb5e0dcad006cde22d2f629c4c22fa729eae.tar.gz |
Fix cdc heartbeats reporting unconfirmed volatile transactions as resolved KIKIMR-20962 (#1473)
-rw-r--r-- | ydb/core/tx/datashard/alter_cdc_stream_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_heartbeat.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/complete_data_tx_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/create_cdc_stream_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__init.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 109 | ||||
-rw-r--r-- | ydb/core/tx/datashard/direct_tx_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_write_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/write_unit.cpp | 2 |
13 files changed, 136 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index 3f47905e976..26aa1f47b49 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -88,8 +88,8 @@ public: return EExecutionStatus::DelayCompleteNoMoreRestarts; } - void Complete(TOperation::TPtr, const TActorContext& ctx) override { - DataShard.EmitHeartbeats(ctx); + void Complete(TOperation::TPtr, const TActorContext&) override { + DataShard.EmitHeartbeats(); } }; diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 0e9ccbec844..0560136f72a 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -1,9 +1,9 @@ #include "cdc_stream_heartbeat.h" #include "datashard_impl.h" -#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) -#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) -#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) namespace NKikimr::NDataShard { @@ -32,7 +32,7 @@ public: TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; } - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + bool Execute(TTransactionContext& txc, const TActorContext&) override { LOG_I("Emit change records" << ": edge# " << Edge << ", at tablet# " << Self->TabletID()); @@ -69,16 +69,16 @@ public: return true; } - void Complete(const TActorContext& ctx) override { + void Complete(const TActorContext&) override { LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)" << ": at tablet# " << Self->TabletID()); Self->EnqueueChangeRecords(std::move(ChangeRecords)); - Self->EmitHeartbeats(ctx); + Self->EmitHeartbeats(); } }; // TTxCdcStreamEmitHeartbeats -void TDataShard::EmitHeartbeats(const TActorContext& ctx) { +void TDataShard::EmitHeartbeats() { LOG_D("Emit heartbeats" << ": at tablet# " << TabletID()); @@ -92,15 +92,23 @@ void TDataShard::EmitHeartbeats(const TActorContext& ctx) { } if (const auto& plan = TransQueue.GetPlan()) { - const auto version = plan.begin()->ToRowVersion(); + const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion()); if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version), ctx); + return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); } + return; + } + + if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) { + if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { + return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); + } + return; } const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite); if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite), ctx); + return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite)); } WaitPlanStep(lowest.Next().Step); diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp index a078bf487c3..1055068cdfe 100644 --- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp @@ -122,7 +122,7 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op, DataShard.NotifySchemeshard(ctx, op->GetTxId()); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); if (op->HasOutputData()) { const auto& outReadSets = op->OutReadSets(); diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index ef5433e8aba..0bbf0261f5d 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -77,7 +77,7 @@ public: void Complete(TOperation::TPtr, const TActorContext& ctx) override { if (AddSender) { ctx.Send(DataShard.GetChangeSender(), AddSender.Release()); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } } }; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 5433ad33c9f..f71f7231e51 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2289,7 +2289,7 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo PromoteFollowerReadEdge(); } - EmitHeartbeats(ctx); + EmitHeartbeats(); } void TDataShard::CheckMediatorStateRestored() { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 93fb4ce452a..b00bb254ecd 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -115,7 +115,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) { Self->CreateChangeSender(ctx); Self->EnqueueChangeRecords(std::move(ChangeRecords)); Self->MaybeActivateChangeSender(ctx); - Self->EmitHeartbeats(ctx); + Self->EmitHeartbeats(); if (!Self->ChangesQueue) { if (!Self->ChangeExchangeSplitter.Done()) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index bc3e8a3e433..7720260dce6 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1877,7 +1877,7 @@ public: TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() { return CdcStreamHeartbeatManager; } const TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() const { return CdcStreamHeartbeatManager; } - void EmitHeartbeats(const TActorContext& ctx); + void EmitHeartbeats(); template <typename... Args> bool PromoteCompleteEdge(Args&&... args) { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 97576e84815..601a8c579ec 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,4 +1,5 @@ #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> +#include "datashard_ut_common_kqp.h" #include <ydb/core/base/path.h> #include <ydb/core/change_exchange/change_sender_common_ops.h> @@ -22,6 +23,7 @@ namespace NKikimr { using namespace NDataShard; +using namespace NDataShard::NKqpHelpers; using namespace Tests; Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { @@ -1864,11 +1866,13 @@ Y_UNIT_TEST_SUITE(Cdc) { void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) { while (true) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); - if (records.size() == expected.size()) { - for (ui32 i = 0; i < expected.size(); ++i) { - AssertJsonsEqual(records.at(i).second, expected.at(i)); - } + for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) { + AssertJsonsEqual(records.at(i).second, expected.at(i)); + } + if (records.size() >= expected.size()) { + UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(), + "Unexpected record: " << records.at(expected.size()).second); break; } @@ -3157,6 +3161,103 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampsVolatileOutOfOrder) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableDataShardVolatileTransactions(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable()); + CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table1", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table2", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + }); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (1, 10); + UPSERT INTO `/Root/Table2` (key, value) VALUES (2, 20); + )"); + + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2]})", + R"({"resolved":"***"})", + }); + + // Block readset exchange + std::vector<std::unique_ptr<IEventHandle>> readSets; + auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + readSets.emplace_back(ev.Release()); + }); + + // Start a distributed write to both tables + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (3, 30); + UPSERT INTO `/Root/Table2` (key, value) VALUES (4, 40); + )", sessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets"); + + // Stop blocking further readsets + blockReadSets.Remove(); + + // Start another distributed write to both tables, it should succeed + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (5, 50); + UPSERT INTO `/Root/Table2` (key, value) VALUES (6, 60); + )"); + + runtime.SimulateSleep(TDuration::Seconds(10)); + + // Unblock readsets + for (auto& ev : readSets) { + runtime.Send(ev.release(), 0, true); + } + readSets.clear(); + + // There should be only one resolved timestamp after out of order writes + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"resolved":"***"})", + R"({"update":{"value":50},"key":[5]})", + R"({"update":{"value":30},"key":[3]})", + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2]})", + R"({"resolved":"***"})", + R"({"update":{"value":60},"key":[6]})", + R"({"update":{"value":40},"key":[4]})", + R"({"resolved":"***"})", + }); + } + Y_UNIT_TEST(SequentialSplitMerge) { TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1); diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index 11e394800cb..b47187c5daa 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -80,7 +80,7 @@ public: void Complete(TOperation::TPtr op, const TActorContext& ctx) override { Pipeline.RemoveCommittingOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get()); Y_ABORT_UNLESS(tx != nullptr); diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 2360bdc4004..7c30cb96009 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -141,7 +141,7 @@ void TFinishProposeUnit::Complete(TOperation::TPtr op, Pipeline.RemoveActiveOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } DataShard.SendRegistrationRequestTimeCast(ctx); diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp index 413f09e0716..d6c9dffa10f 100644 --- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -140,7 +140,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext Pipeline.RemoveActiveOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } DataShard.SendRegistrationRequestTimeCast(ctx); diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 35af1fd255f..edd5ca149d6 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -610,6 +610,8 @@ namespace NKikimr::NDataShard { Self->PromoteFollowerReadEdge(); } + Self->EmitHeartbeats(); + if (!WaitingSnapshotEvents.empty()) { TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr; while (!WaitingSnapshotEvents.empty()) { diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index c469d75b0fd..15d4065b279 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -182,7 +182,7 @@ public: void Complete(TOperation::TPtr op, const TActorContext& ctx) override { Pipeline.RemoveCommittingOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); |