summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp3
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp125
3 files changed, 124 insertions, 6 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
index 375fb74fe8e..1e97edbc084 100644
--- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
@@ -180,7 +180,7 @@ void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
}
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
- NKikimr::AppData(ctx)->PQConfig, ""
+ true, "", ""
);
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
index d357990be9e..1785ff8e1c6 100644
--- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
@@ -389,6 +389,7 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada
if (topic.has_value()) {
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
OriginalTopicNames[normalizedTopicName] = topic.value();
+ OriginalTopicNames[normalizedTopicName + "/streamImpl"] = topic.value();
topics.emplace(normalizedTopicName);
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
}
@@ -442,7 +443,7 @@ void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorCont
void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) {
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
- AppData(ctx)->PQConfig, ""
+ true, "", ""
);
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
topicConverterFactory
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 87dd248a251..2e5b9589931 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -282,6 +282,21 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32
}
+void AlterTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, std::vector<TString> consumers) {
+ auto topicSettings = NYdb::NTopic::TAlterTopicSettings();
+
+ for (auto& consumer : consumers) {
+ topicSettings.BeginAddConsumer(consumer).EndAddConsumer();
+ }
+
+ auto result = pqClient
+ .AlterTopic(topicName, topicSettings)
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+}
Y_UNIT_TEST_SUITE(KafkaProtocol) {
@@ -767,8 +782,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
} // Y_UNIT_TEST(FetchScenario)
- Y_UNIT_TEST(BalanceScenario) {
-
+ void RunBalanceScenarionTest(bool forFederation) {
TString protocolName = "roundrobin";
TInsecureTestServer testServer("2");
@@ -788,6 +802,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
CreateTopic(pqClient, topicName, minActivePartitions, {group});
CreateTopic(pqClient, secondTopicName, minActivePartitions, {group});
+ if (forFederation) {
+ testServer.KikimrServer->GetServer().GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(false);
+ }
TKafkaTestClient clientA(testServer.Port);
TKafkaTestClient clientB(testServer.Port);
TKafkaTestClient clientC(testServer.Port);
@@ -821,6 +838,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
// clientA join group, and get all partitions
auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions, protocolName, minActivePartitions);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, topicName);
// clientB join group, and get 0 partitions, becouse it's all at clientA
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -981,7 +999,105 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
}
- } // Y_UNIT_TEST(BalanceScenario)
+ } // RunBalanceScenarionTest()
+
+ Y_UNIT_TEST(BalanceScenario) {
+ RunBalanceScenarionTest(false);
+ }
+
+ Y_UNIT_TEST(BalanceScenarioForFederation) {
+ RunBalanceScenarionTest(true);
+ }
+
+ Y_UNIT_TEST(BalanceScenarioCdc) {
+
+ TString protocolName = "roundrobin";
+ TInsecureTestServer testServer("2");
+
+
+ TString tableName = "/Root/table-0-test";
+ TString feedName = "feed";
+ TString feedPath = tableName + "/" + feedName;
+ TString tableShortName = "table-0-test";
+ TString feedShortPath = tableShortName + "/" + feedName;
+
+ TString group = "consumer-0";
+ TString notExistsGroup = "consumer-not-exists";
+
+ // create table and init cdc for it
+ {
+ NYdb::NTable::TTableClient tableClient(*testServer.Driver);
+ tableClient.RetryOperationSync([&](TSession session)
+ {
+ NYdb::NTable::TTableBuilder builder;
+ builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn("key");
+ builder.AddNonNullableColumn("value", NYdb::EPrimitiveType::Int64);
+
+ auto createResult = session.CreateTable(tableName, builder.Build()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(createResult.IsTransportError(), false);
+ Cerr << createResult.GetIssues().ToString() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(createResult.GetStatus(), EStatus::SUCCESS);
+
+ auto alterResult = session.AlterTable(tableName, NYdb::NTable::TAlterTableSettings()
+ .AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName,
+ NYdb::NTable::EChangefeedMode::Updates,
+ NYdb::NTable::EChangefeedFormat::Json))
+ ).ExtractValueSync();
+ Cerr << alterResult.GetIssues().ToString() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(alterResult.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
+ return alterResult;
+ }
+ );
+
+ TValueBuilder rows;
+ rows.BeginList();
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember("key").Int64(1)
+ .AddMember("value").Int64(2)
+ .EndStruct();
+ rows.EndList();
+
+ auto upsertResult = tableClient.BulkUpsert(tableName, rows.Build()).GetValueSync();
+ UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
+ }
+
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ AlterTopic(pqClient, feedPath, {group});
+
+ for(auto name : {feedPath, feedShortPath} ) {
+ TKafkaTestClient clientA(testServer.Port);
+ {
+ auto msg = clientA.ApiVersions();
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
+ }
+ {
+ auto msg = clientA.SaslHandshake();
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+ {
+ auto msg = clientA.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check partitions balance
+ std::vector<TString> topics;
+ topics.push_back(name);
+
+ // clientA join group, and get all partitions
+ auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, 1, protocolName, 1);
+ UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, name);
+ }
+ }
+ } // Y_UNIT_TEST(BalanceScenarioCdc)
Y_UNIT_TEST(OffsetCommitAndFetchScenario) {
TInsecureTestServer testServer("2");
@@ -2092,9 +2208,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(syncRespB->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
UNIT_ASSERT_VALUES_EQUAL(syncRespC->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
- auto countPartitions = [](const TConsumerProtocolAssignment& assignment) {
+ auto countPartitions = [topicName](const TConsumerProtocolAssignment& assignment) {
size_t sum = 0;
for (auto& ta : assignment.AssignedPartitions) {
+ UNIT_ASSERT_VALUES_EQUAL(ta.Topic, topicName);
sum += ta.Partitions.size();
}
return sum;