aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Timizhev <ditimizhev@ydb.tech>2024-12-24 15:04:32 +0300
committerGitHub <noreply@github.com>2024-12-24 15:04:32 +0300
commit596609e6857f25dc2f016db1b0e3eea18d569b1d (patch)
tree9ef5bf3f4691edbfb7fc325168b7d90152f2612b
parentf217802c3f130596ac93e79950111f3f990fc090 (diff)
downloadydb-596609e6857f25dc2f016db1b0e3eea18d569b1d.tar.gz
Add immediate commit & rollback for topics in BufferWriteActor (#12669)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp50
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp40
2 files changed, 70 insertions, 20 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index a1655aa7ed..8dd6dda3a2 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -109,6 +109,23 @@ namespace {
*protoLocks->AddLocks() = lock;
}
}
+
+ void FillTopicsCommit(NKikimrPQ::TDataTransaction& transaction, const NKikimr::NKqp::IKqpTransactionManagerPtr& txManager) {
+ transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);
+ const auto prepareSettings = txManager->GetPrepareTransactionInfo();
+
+ if (!prepareSettings.ArbiterColumnShard) {
+ for (const ui64 sendingShardId : prepareSettings.SendingShards) {
+ transaction.AddSendingShards(sendingShardId);
+ }
+ for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
+ transaction.AddReceivingShards(receivingShardId);
+ }
+ } else {
+ transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
+ transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
+ }
+ }
}
@@ -1594,7 +1611,7 @@ public:
Close();
Process();
SendToExternalShards(false);
- SendToTopics();
+ SendToTopics(false);
}
void ImmediateCommit() {
@@ -1613,6 +1630,7 @@ public:
}
Close();
Process();
+ SendToTopics(true);
}
void DistributedCommit() {
@@ -1640,6 +1658,7 @@ public:
CA_LOG_D("Start rollback");
State = EState::ROLLINGBACK;
SendToExternalShards(true);
+ SendToTopics(true);
}
void SendToExternalShards(bool isRollback) {
@@ -1692,7 +1711,7 @@ public:
}
}
- void SendToTopics() {
+ void SendToTopics(bool isImmediateCommit) {
if (!TxManager->HasTopics()) {
return;
}
@@ -1707,29 +1726,19 @@ public:
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;
- transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);
-
- const auto prepareSettings = TxManager->GetPrepareTransactionInfo();
- if (!prepareSettings.ArbiterColumnShard) {
- for (const ui64 sendingShardId : prepareSettings.SendingShards) {
- transaction.AddSendingShards(sendingShardId);
- }
- for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
- transaction.AddReceivingShards(receivingShardId);
- }
- } else {
- transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
- transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
+
+ if (!isImmediateCommit) {
+ FillTopicsCommit(transaction, TxManager);
}
- auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
-
if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
}
- transaction.SetImmediate(false);
+ transaction.SetImmediate(isImmediateCommit);
+
+ auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
ActorIdToProto(SelfId(), ev->Record.MutableSourceActor());
ev->Record.MutableData()->Swap(&transaction);
@@ -1738,7 +1747,8 @@ public:
SendTime[tabletId] = TInstant::Now();
auto traceId = BufferWriteActor.GetTraceId();
- CA_LOG_D("Preparing KQP transaction on topic tablet: " << tabletId << ", writeId: " << writeId);
+ CA_LOG_D("Executing KQP transaction on topic tablet: " << tabletId
+ << ", writeId: " << writeId << ", isImmediateCommit: " << isImmediateCommit);
Send(
MakePipePerNodeCacheID(false),
@@ -1962,7 +1972,7 @@ public:
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
- } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty() && !TxManager->HasTopics()) {
+ } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) {
TxManager->StartExecute();
ImmediateCommit();
} else {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index 4753630f8e..c9102dad3a 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -2587,6 +2587,24 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopic_4, TFixtureOltpSink)
TestWriteToTopic9();
}
+Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink)
+{
+ CreateTopic("topic_A");
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
+
+ Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
+
+ RollbackTx(tx, EStatus::SUCCESS);
+
+ Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
+}
+
Y_UNIT_TEST_F(OltpSink_WriteToTopics_1, TFixtureOltpSink)
{
TestWriteToTopic1();
@@ -2720,6 +2738,28 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_4, TFixtureOltpSink)
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
}
+
+Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink)
+{
+ CreateTopic("topic_A");
+ CreateTable("/Root/table_A");
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ auto records = MakeTableRecords();
+ WriteToTable("table_A", records, &tx);
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
+
+ RollbackTx(tx, EStatus::SUCCESS);
+
+ Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
+ CheckTabletKeys("topic_A");
+
+ UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0);
+}
}
}