aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Timizhev <ditimizhev@ydb.tech>2024-12-25 13:41:01 +0300
committerGitHub <noreply@github.com>2024-12-25 13:41:01 +0300
commit97167ebef4e7b3ec0d5e53a4cf0547aabdc06e40 (patch)
tree0c07b9baef5a878d95d312caf8d2a2dd88a17bbf
parent308501bfaf85f7e1c66a2b555f035506a08ccdd5 (diff)
downloadydb-97167ebef4e7b3ec0d5e53a4cf0547aabdc06e40.tar.gz
Add OLAP support with topics in BufferWriteActor (#12686)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp4
-rw-r--r--ydb/core/testlib/test_client.h12
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp240
4 files changed, 211 insertions, 46 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index a5244a8550..46dd289676 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1419,7 +1419,6 @@ public:
State = EState::WRITING;
Alloc->Release();
Counters->BufferActorsCount->Inc();
- TxManager->AddTopicsToShards();
}
void Bootstrap() {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 5a1b29a1ac..f64d2e7507 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1378,6 +1378,7 @@ public:
&& !txCtx->BufferActorId
&& (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) {
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
+ txCtx->TxManager->AddTopicsToShards();
TKqpBufferWriterSettings settings {
.SessionActorId = SelfId(),
@@ -1388,6 +1389,9 @@ public:
};
auto* actor = CreateKqpBufferWriterActor(std::move(settings));
txCtx->BufferActorId = RegisterWithSameMailbox(actor);
+ } else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) {
+ txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
+ txCtx->TxManager->AddTopicsToShards();
}
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index bca668ac3d..1279b80a36 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -231,6 +231,18 @@ namespace Tests {
AppConfig->MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
return *this;
}
+ TServerSettings& SetEnableOlapSink(bool withOlapSink) {
+ AppConfig->MutableTableServiceConfig()->SetEnableOlapSink(withOlapSink);
+ return *this;
+ }
+ TServerSettings& SetEnableHtapTx(bool withHtapTx) {
+ AppConfig->MutableTableServiceConfig()->SetEnableHtapTx(withHtapTx);
+ return *this;
+ }
+ TServerSettings& SetAllowOlapDataQuery(bool withAllowOlapDataQuery) {
+ AppConfig->MutableTableServiceConfig()->SetAllowOlapDataQuery(withAllowOlapDataQuery);
+ return *this;
+ }
// Add additional grpc services
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index c9102dad3a..f33f2b5a0e 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -233,6 +233,9 @@ protected:
const TString& boundary);
virtual bool GetEnableOltpSink() const;
+ virtual bool GetEnableOlapSink() const;
+ virtual bool GetEnableHtapTx() const;
+ virtual bool GetAllowOlapDataQuery() const;
private:
template<class E>
@@ -279,6 +282,9 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
settings.SetEnableTopicSplitMerge(true);
settings.SetEnablePQConfigTransactionsAtSchemeShard(true);
settings.SetEnableOltpSink(GetEnableOltpSink());
+ settings.SetEnableOlapSink(GetEnableOlapSink());
+ settings.SetEnableHtapTx(GetEnableHtapTx());
+ settings.SetAllowOlapDataQuery(GetAllowOlapDataQuery());
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
@@ -1406,6 +1412,21 @@ bool TFixture::GetEnableOltpSink() const
return false;
}
+bool TFixture::GetEnableOlapSink() const
+{
+ return false;
+}
+
+bool TFixture::GetEnableHtapTx() const
+{
+ return false;
+}
+
+bool TFixture::GetAllowOlapDataQuery() const
+{
+ return false;
+}
+
Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture)
{
TestWriteToTopic1();
@@ -2557,37 +2578,80 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
}
-class TFixtureOltpSink : public TFixture {
+class TFixtureSinks : public TFixture {
protected:
+ void CreateRowTable(const TString& path);
+ void CreateColumnTable(const TString& tablePath);
+
bool GetEnableOltpSink() const override;
+ bool GetEnableOlapSink() const override;
+ bool GetEnableHtapTx() const override;
+ bool GetAllowOlapDataQuery() const override;
};
-bool TFixtureOltpSink::GetEnableOltpSink() const
+void TFixtureSinks::CreateRowTable(const TString& path)
+{
+ CreateTable(path);
+}
+
+void TFixtureSinks::CreateColumnTable(const TString& tablePath)
+{
+ UNIT_ASSERT(!tablePath.empty());
+
+ TString path = (tablePath[0] != '/') ? ("/Root/" + tablePath) : tablePath;
+
+ NTable::TSession session = CreateTableSession();
+ auto desc = NTable::TTableBuilder()
+ .SetStoreType(NTable::EStoreType::Column)
+ .AddNonNullableColumn("key", EPrimitiveType::Utf8)
+ .AddNonNullableColumn("value", EPrimitiveType::Utf8)
+ .SetPrimaryKeyColumn("key")
+ .Build();
+ auto result = session.CreateTable(path, std::move(desc)).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+}
+
+bool TFixtureSinks::GetEnableOltpSink() const
+{
+ return true;
+}
+
+bool TFixtureSinks::GetEnableOlapSink() const
+{
+ return true;
+}
+
+bool TFixtureSinks::GetEnableHtapTx() const
+{
+ return true;
+}
+
+bool TFixtureSinks::GetAllowOlapDataQuery() const
{
return true;
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopic_1, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_1, TFixtureSinks)
{
TestWriteToTopic7();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopic_2, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_2, TFixtureSinks)
{
TestWriteToTopic10();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopic_3, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_3, TFixtureSinks)
{
TestWriteToTopic26();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopic_4, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_4, TFixtureSinks)
{
TestWriteToTopic9();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_5, TFixtureSinks)
{
CreateTopic("topic_A");
@@ -2605,45 +2669,45 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopic_5, TFixtureOltpSink)
Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopics_1, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_1, TFixtureSinks)
{
TestWriteToTopic1();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopics_2, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_2, TFixtureSinks)
{
TestWriteToTopic27();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopics_3, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_3, TFixtureSinks)
{
TestWriteToTopic11();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopics_4, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_4, TFixtureSinks)
{
TestWriteToTopic4();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_1, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_1, TFixtureSinks)
{
TestWriteToTopic24();
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2, TFixtureSinks)
{
CreateTopic("topic_A");
CreateTopic("topic_B");
- CreateTable("/Root/table_A");
+ CreateRowTable("/Root/table_A");
NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);
auto records = MakeTableRecords();
-
WriteToTable("table_A", records, &tx);
+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
-
+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", &tx);
@@ -2651,16 +2715,14 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink)
CommitTx(tx, EStatus::SUCCESS);
{
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records));
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
}
{
- auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
- UNIT_ASSERT_VALUES_EQUAL(messages[2], "message #3");
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1");
+ UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #3");
}
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
@@ -2669,25 +2731,24 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_2, TFixtureOltpSink)
CheckTabletKeys("topic_B");
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_3, TFixtureSinks)
{
CreateTopic("topic_A");
CreateTopic("topic_B");
- CreateTable("/Root/table_A");
- CreateTable("/Root/table_B");
+ CreateRowTable("/Root/table_A");
+ CreateRowTable("/Root/table_B");
NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);
auto records = MakeTableRecords();
-
WriteToTable("table_A", records, &tx);
WriteToTable("table_B", records, &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
-
- size_t topicMsgCnt = 10;
+
+ const size_t topicMsgCnt = 10;
for (size_t i = 1; i <= topicMsgCnt; ++i) {
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx);
}
@@ -2695,16 +2756,14 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink)
CommitTx(tx, EStatus::SUCCESS);
{
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records));
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
}
{
- auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), topicMsgCnt);
- UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
- UNIT_ASSERT_VALUES_EQUAL(messages[topicMsgCnt - 1], "message #" + std::to_string(topicMsgCnt));
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1");
+ UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt));
}
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
@@ -2714,35 +2773,37 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_3, TFixtureOltpSink)
CheckTabletKeys("topic_B");
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_4, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_4, TFixtureSinks)
{
CreateTopic("topic_A");
- CreateTable("/Root/table_A");
+ CreateRowTable("/Root/table_A");
NTable::TSession tableSession = CreateTableSession();
- auto records = MakeTableRecords();
-
NTable::TTransaction tx1 = BeginTx(tableSession);
NTable::TTransaction tx2 = BeginTx(tableSession);
ExecuteDataQuery(tableSession, R"(SELECT COUNT(*) FROM `table_A`)", NTable::TTxControl::Tx(tx1));
+
+ auto records = MakeTableRecords();
WriteToTable("table_A", records, &tx2);
+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx1);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
CommitTx(tx2, EStatus::SUCCESS);
CommitTx(tx1, EStatus::ABORTED);
- auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
- CheckTabletKeys("topic_A");
+ Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
+
+ CheckTabletKeys("topic_A");
}
-Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink)
+Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks)
{
CreateTopic("topic_A");
- CreateTable("/Root/table_A");
+ CreateRowTable("/Root/table_A");
NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);
@@ -2756,9 +2817,98 @@ Y_UNIT_TEST_F(OltpSink_WriteToTopicAndTable_5, TFixtureOltpSink)
RollbackTx(tx, EStatus::SUCCESS);
Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0);
+
CheckTabletKeys("topic_A");
+}
+
+Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks)
+{
+ CreateTopic("topic_A");
+ CreateColumnTable("/Root/table_A");
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ auto records = MakeTableRecords();
+ WriteToTable("table_A", records, &tx);
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
+
+ CommitTx(tx, EStatus::SUCCESS);
+
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
+
+ UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
+
+ CheckTabletKeys("topic_A");
+}
+
+Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks)
+{
+ CreateTopic("topic_A");
+ CreateTopic("topic_B");
+
+ CreateRowTable("/Root/table_A");
+ CreateColumnTable("/Root/table_B");
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ auto records = MakeTableRecords();
+
+ WriteToTable("table_A", records, &tx);
+ WriteToTable("table_B", records, &tx);
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
+
+ const size_t topicMsgCnt = 10;
+ for (size_t i = 1; i <= topicMsgCnt; ++i) {
+ WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx);
+ }
+
+ CommitTx(tx, EStatus::SUCCESS);
+
+ {
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records));
+ }
+
+ {
+ auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt);
+ UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1");
+ UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size());
+ UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_B"), records.size());
+
+ CheckTabletKeys("topic_A");
+ CheckTabletKeys("topic_B");
+}
+
+Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
+{
+ CreateTopic("topic_A");
+ CreateColumnTable("/Root/table_A");
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ auto records = MakeTableRecords();
+ WriteToTable("table_A", records, &tx);
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
+
+ RollbackTx(tx, EStatus::SUCCESS);
+
+ Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0);
UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0);
+
+ CheckTabletKeys("topic_A");
}
}