diff options
author | Daniil Timizhev <ditimizhev@ydb.tech> | 2024-12-25 13:41:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-25 13:41:01 +0300 |
commit | 97167ebef4e7b3ec0d5e53a4cf0547aabdc06e40 (patch) | |
tree | 0c07b9baef5a878d95d312caf8d2a2dd88a17bbf | |
parent | 308501bfaf85f7e1c66a2b555f035506a08ccdd5 (diff) | |
download | ydb-97167ebef4e7b3ec0d5e53a4cf0547aabdc06e40.tar.gz |
Add OLAP support with topics in BufferWriteActor (#12686)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 12 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp | 240 |
4 files changed, 211 insertions, 46 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index a5244a8550..46dd289676 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1419,7 +1419,6 @@ public: State = EState::WRITING; Alloc->Release(); Counters->BufferActorsCount->Inc(); - TxManager->AddTopicsToShards(); } void Bootstrap() { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 5a1b29a1ac..f64d2e7507 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1378,6 +1378,7 @@ public: && !txCtx->BufferActorId && (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) { txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations)); + txCtx->TxManager->AddTopicsToShards(); TKqpBufferWriterSettings settings { .SessionActorId = SelfId(), @@ -1388,6 +1389,9 @@ public: }; auto* actor = CreateKqpBufferWriterActor(std::move(settings)); txCtx->BufferActorId = RegisterWithSameMailbox(actor); + } else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) { + txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations)); + txCtx->TxManager->AddTopicsToShards(); } auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index bca668ac3d..1279b80a36 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -231,6 +231,18 @@ namespace Tests { AppConfig->MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); return *this; } + TServerSettings& SetEnableOlapSink(bool withOlapSink) { + AppConfig->MutableTableServiceConfig()->SetEnableOlapSink(withOlapSink); + return *this; + } + TServerSettings& SetEnableHtapTx(bool withHtapTx) { + AppConfig->MutableTableServiceConfig()->SetEnableHtapTx(withHtapTx); + return *this; + } + TServerSettings& SetAllowOlapDataQuery(bool withAllowOlapDataQuery) { + AppConfig->MutableTableServiceConfig()->SetAllowOlapDataQuery(withAllowOlapDataQuery); + return *this; + } // Add additional grpc services diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index c9102dad3a..f33f2b5a0e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -233,6 +233,9 @@ protected: const TString& boundary); virtual bool GetEnableOltpSink() const; + virtual bool GetEnableOlapSink() const; + virtual bool GetEnableHtapTx() const; + virtual bool GetAllowOlapDataQuery() const; private: template<class E> @@ -279,6 +282,9 @@ void TFixture::SetUp(NUnitTest::TTestContext&) settings.SetEnableTopicSplitMerge(true); settings.SetEnablePQConfigTransactionsAtSchemeShard(true); settings.SetEnableOltpSink(GetEnableOltpSink()); + settings.SetEnableOlapSink(GetEnableOlapSink()); + settings.SetEnableHtapTx(GetEnableHtapTx()); + settings.SetAllowOlapDataQuery(GetAllowOlapDataQuery()); Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings); @@ -1406,6 +1412,21 @@ bool TFixture::GetEnableOltpSink() const return false; } +bool TFixture::GetEnableOlapSink() const +{ + return false; +} + +bool TFixture::GetEnableHtapTx() const +{ + return false; +} + +bool TFixture::GetAllowOlapDataQuery() const +{ + return false; +} + Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) { TestWriteToTopic1(); @@ -2557,37 +2578,80 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); } -class TFixtureOltpSink : public TFixture { +class TFixtureSinks : public TFixture { protected: + void CreateRowTable(const TString& path); + void CreateColumnTable(const TString& tablePath); + bool GetEnableOltpSink() const override; + bool GetEnableOlapSink() const override; + bool GetEnableHtapTx() const override; + bool GetAllowOlapDataQuery() const override; }; -bool TFixtureOltpSink::GetEnableOltpSink() const +void TFixtureSinks::CreateRowTable(const TString& path) +{ + CreateTable(path); +} + +void TFixtureSinks::CreateColumnTable(const TString& tablePath) +{ + UNIT_ASSERT(!tablePath.empty()); + + TString path = (tablePath[0] != '/') ? ("/Root/" + tablePath) : tablePath; + + NTable::TSession session = CreateTableSession(); + auto desc = NTable::TTableBuilder() + .SetStoreType(NTable::EStoreType::Column) + .AddNonNullableColumn("key", EPrimitiveType::Utf8) + .AddNonNullableColumn("value", EPrimitiveType::Utf8) + .SetPrimaryKeyColumn("key") + .Build(); + auto result = session.CreateTable(path, std::move(desc)).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +} + +bool TFixtureSinks::GetEnableOltpSink() const +{ + return true; +} + +bool TFixtureSinks::GetEnableOlapSink() const +{ + return true; +} + +bool TFixtureSinks::GetEnableHtapTx() const +{ + return true; +} + +bool TFixtureSinks::GetAllowOlapDataQuery() const { return true; } -Y_UNIT_TEST_F(OltpSink_WriteToTopic_1, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_1, TFixtureSinks) { TestWriteToTopic7(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopic_2, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_2, TFixtureSinks) { TestWriteToTopic10(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopic_3, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_3, TFixtureSinks) { TestWriteToTopic26(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopic_4, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_4, TFixtureSinks) { TestWriteToTopic9(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_5, TFixtureSinks) { CreateTopic("topic_A"); @@ -2605,45 +2669,45 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink) Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); } -Y_UNIT_TEST_F(OltpSink_WriteToTopics_1, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_1, TFixtureSinks) { TestWriteToTopic1(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopics_2, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_2, TFixtureSinks) { TestWriteToTopic27(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopics_3, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_3, TFixtureSinks) { TestWriteToTopic11(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopics_4, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_4, TFixtureSinks) { TestWriteToTopic4(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_1, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_1, TFixtureSinks) { TestWriteToTopic24(); } -Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2, TFixtureSinks) { CreateTopic("topic_A"); CreateTopic("topic_B"); - CreateTable("/Root/table_A"); + CreateRowTable("/Root/table_A"); NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); auto records = MakeTableRecords(); - WriteToTable("table_A", records, &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", &tx); @@ -2651,16 +2715,14 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink) CommitTx(tx, EStatus::SUCCESS); { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records)); + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); } { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - UNIT_ASSERT_VALUES_EQUAL(messages[2], "message #3"); + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #3"); } UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); @@ -2669,25 +2731,24 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink) CheckTabletKeys("topic_B"); } -Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_3, TFixtureSinks) { CreateTopic("topic_A"); CreateTopic("topic_B"); - CreateTable("/Root/table_A"); - CreateTable("/Root/table_B"); + CreateRowTable("/Root/table_A"); + CreateRowTable("/Root/table_B"); NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); auto records = MakeTableRecords(); - WriteToTable("table_A", records, &tx); WriteToTable("table_B", records, &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - - size_t topicMsgCnt = 10; + + const size_t topicMsgCnt = 10; for (size_t i = 1; i <= topicMsgCnt; ++i) { WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx); } @@ -2695,16 +2756,14 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink) CommitTx(tx, EStatus::SUCCESS); { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records)); + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); } { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), topicMsgCnt); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - UNIT_ASSERT_VALUES_EQUAL(messages[topicMsgCnt - 1], "message #" + std::to_string(topicMsgCnt)); + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt)); } UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); @@ -2714,35 +2773,37 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink) CheckTabletKeys("topic_B"); } -Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_4, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_4, TFixtureSinks) { CreateTopic("topic_A"); - CreateTable("/Root/table_A"); + CreateRowTable("/Root/table_A"); NTable::TSession tableSession = CreateTableSession(); - auto records = MakeTableRecords(); - NTable::TTransaction tx1 = BeginTx(tableSession); NTable::TTransaction tx2 = BeginTx(tableSession); ExecuteDataQuery(tableSession, R"(SELECT COUNT(*) FROM `table_A`)", NTable::TTxControl::Tx(tx1)); + + auto records = MakeTableRecords(); WriteToTable("table_A", records, &tx2); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx1); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); CommitTx(tx2, EStatus::SUCCESS); CommitTx(tx1, EStatus::ABORTED); - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - CheckTabletKeys("topic_A"); + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); } -Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink) +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks) { CreateTopic("topic_A"); - CreateTable("/Root/table_A"); + CreateRowTable("/Root/table_A"); NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); @@ -2756,9 +2817,98 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink) RollbackTx(tx, EStatus::SUCCESS); Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0); + CheckTabletKeys("topic_A"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateColumnTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + CreateRowTable("/Root/table_A"); + CreateColumnTable("/Root/table_B"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + + WriteToTable("table_A", records, &tx); + WriteToTable("table_B", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + const size_t topicMsgCnt = 10; + for (size_t i = 1; i <= topicMsgCnt; ++i) { + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx); + } + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt)); + } + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_B"), records.size()); + + CheckTabletKeys("topic_A"); + CheckTabletKeys("topic_B"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateColumnTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0); + + CheckTabletKeys("topic_A"); } } |