diff options
author | Daniil Timizhev <ditimizhev@ydb.tech> | 2024-12-24 15:04:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-24 15:04:32 +0300 |
commit | 596609e6857f25dc2f016db1b0e3eea18d569b1d (patch) | |
tree | 9ef5bf3f4691edbfb7fc325168b7d90152f2612b | |
parent | f217802c3f130596ac93e79950111f3f990fc090 (diff) | |
download | ydb-596609e6857f25dc2f016db1b0e3eea18d569b1d.tar.gz |
Add immediate commit & rollback for topics in BufferWriteActor (#12669)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 50 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp | 40 |
2 files changed, 70 insertions, 20 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index a1655aa7ed..8dd6dda3a2 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -109,6 +109,23 @@ namespace { *protoLocks->AddLocks() = lock; } } + + void FillTopicsCommit(NKikimrPQ::TDataTransaction& transaction, const NKikimr::NKqp::IKqpTransactionManagerPtr& txManager) { + transaction.SetOp(NKikimrPQ::TDataTransaction::Commit); + const auto prepareSettings = txManager->GetPrepareTransactionInfo(); + + if (!prepareSettings.ArbiterColumnShard) { + for (const ui64 sendingShardId : prepareSettings.SendingShards) { + transaction.AddSendingShards(sendingShardId); + } + for (const ui64 receivingShardId : prepareSettings.ReceivingShards) { + transaction.AddReceivingShards(receivingShardId); + } + } else { + transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard); + transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard); + } + } } @@ -1594,7 +1611,7 @@ public: Close(); Process(); SendToExternalShards(false); - SendToTopics(); + SendToTopics(false); } void ImmediateCommit() { @@ -1613,6 +1630,7 @@ public: } Close(); Process(); + SendToTopics(true); } void DistributedCommit() { @@ -1640,6 +1658,7 @@ public: CA_LOG_D("Start rollback"); State = EState::ROLLINGBACK; SendToExternalShards(true); + SendToTopics(true); } void SendToExternalShards(bool isRollback) { @@ -1692,7 +1711,7 @@ public: } } - void SendToTopics() { + void SendToTopics(bool isImmediateCommit) { if (!TxManager->HasTopics()) { return; } @@ -1707,29 +1726,19 @@ public: for (auto& [tabletId, t] : topicTxs) { auto& transaction = t.tx; - transaction.SetOp(NKikimrPQ::TDataTransaction::Commit); - - const auto prepareSettings = TxManager->GetPrepareTransactionInfo(); - if (!prepareSettings.ArbiterColumnShard) { - for (const ui64 sendingShardId : prepareSettings.SendingShards) { - transaction.AddSendingShards(sendingShardId); - } - for (const ui64 receivingShardId : prepareSettings.ReceivingShards) { - transaction.AddReceivingShards(receivingShardId); - } - } else { - transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard); - transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard); + + if (!isImmediateCommit) { + FillTopicsCommit(transaction, TxManager); } - auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>(); - if (t.hasWrite && writeId.Defined()) { auto* w = transaction.MutableWriteId(); w->SetNodeId(SelfId().NodeId()); w->SetKeyId(*writeId); } - transaction.SetImmediate(false); + transaction.SetImmediate(isImmediateCommit); + + auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>(); ActorIdToProto(SelfId(), ev->Record.MutableSourceActor()); ev->Record.MutableData()->Swap(&transaction); @@ -1738,7 +1747,8 @@ public: SendTime[tabletId] = TInstant::Now(); auto traceId = BufferWriteActor.GetTraceId(); - CA_LOG_D("Preparing KQP transaction on topic tablet: " << tabletId << ", writeId: " << writeId); + CA_LOG_D("Executing KQP transaction on topic tablet: " << tabletId + << ", writeId: " << writeId << ", isImmediateCommit: " << isImmediateCommit); Send( MakePipePerNodeCacheID(false), @@ -1962,7 +1972,7 @@ public: Rollback(); State = EState::FINISHED; Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); - } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty() && !TxManager->HasTopics()) { + } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) { TxManager->StartExecute(); ImmediateCommit(); } else { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 4753630f8e..c9102dad3a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -2587,6 +2587,24 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopic_4, TFixtureOltpSink) TestWriteToTopic9(); } +Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); +} + Y_UNIT_TEST_F(OltpSink_WriteToTopics_1, TFixtureOltpSink) { TestWriteToTopic1(); @@ -2720,6 +2738,28 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_4, TFixtureOltpSink) UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); } + +Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink) +{ + CreateTopic("topic_A"); + CreateTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + CheckTabletKeys("topic_A"); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0); +} } } |