diff options
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp | 3 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 125 |
3 files changed, 124 insertions, 6 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp index 375fb74fe8e..1e97edbc084 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp @@ -180,7 +180,7 @@ void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) { } auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( - NKikimr::AppData(ctx)->PQConfig, "" + true, "", "" ); auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>( diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index d357990be9e..1785ff8e1c6 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -389,6 +389,7 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada if (topic.has_value()) { auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value()); OriginalTopicNames[normalizedTopicName] = topic.value(); + OriginalTopicNames[normalizedTopicName + "/streamImpl"] = topic.value(); topics.emplace(normalizedTopicName); KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic); } @@ -442,7 +443,7 @@ void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorCont void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) { auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( - AppData(ctx)->PQConfig, "" + true, "", "" ); auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>( topicConverterFactory diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 87dd248a251..2e5b9589931 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -282,6 +282,21 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 } +void AlterTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, std::vector<TString> consumers) { + auto topicSettings = NYdb::NTopic::TAlterTopicSettings(); + + for (auto& consumer : consumers) { + topicSettings.BeginAddConsumer(consumer).EndAddConsumer(); + } + + auto result = pqClient + .AlterTopic(topicName, topicSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + +} Y_UNIT_TEST_SUITE(KafkaProtocol) { @@ -767,8 +782,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(FetchScenario) - Y_UNIT_TEST(BalanceScenario) { - + void RunBalanceScenarionTest(bool forFederation) { TString protocolName = "roundrobin"; TInsecureTestServer testServer("2"); @@ -788,6 +802,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { CreateTopic(pqClient, topicName, minActivePartitions, {group}); CreateTopic(pqClient, secondTopicName, minActivePartitions, {group}); + if (forFederation) { + testServer.KikimrServer->GetServer().GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(false); + } TKafkaTestClient clientA(testServer.Port); TKafkaTestClient clientB(testServer.Port); TKafkaTestClient clientC(testServer.Port); @@ -821,6 +838,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { // clientA join group, and get all partitions auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions, protocolName, minActivePartitions); UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, topicName); // clientB join group, and get 0 partitions, becouse it's all at clientA UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); @@ -981,7 +999,105 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin } - } // Y_UNIT_TEST(BalanceScenario) + } // RunBalanceScenarionTest() + + Y_UNIT_TEST(BalanceScenario) { + RunBalanceScenarionTest(false); + } + + Y_UNIT_TEST(BalanceScenarioForFederation) { + RunBalanceScenarionTest(true); + } + + Y_UNIT_TEST(BalanceScenarioCdc) { + + TString protocolName = "roundrobin"; + TInsecureTestServer testServer("2"); + + + TString tableName = "/Root/table-0-test"; + TString feedName = "feed"; + TString feedPath = tableName + "/" + feedName; + TString tableShortName = "table-0-test"; + TString feedShortPath = tableShortName + "/" + feedName; + + TString group = "consumer-0"; + TString notExistsGroup = "consumer-not-exists"; + + // create table and init cdc for it + { + NYdb::NTable::TTableClient tableClient(*testServer.Driver); + tableClient.RetryOperationSync([&](TSession session) + { + NYdb::NTable::TTableBuilder builder; + builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn("key"); + builder.AddNonNullableColumn("value", NYdb::EPrimitiveType::Int64); + + auto createResult = session.CreateTable(tableName, builder.Build()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(createResult.IsTransportError(), false); + Cerr << createResult.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(createResult.GetStatus(), EStatus::SUCCESS); + + auto alterResult = session.AlterTable(tableName, NYdb::NTable::TAlterTableSettings() + .AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName, + NYdb::NTable::EChangefeedMode::Updates, + NYdb::NTable::EChangefeedFormat::Json)) + ).ExtractValueSync(); + Cerr << alterResult.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(alterResult.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS); + return alterResult; + } + ); + + TValueBuilder rows; + rows.BeginList(); + rows.AddListItem() + .BeginStruct() + .AddMember("key").Int64(1) + .AddMember("value").Int64(2) + .EndStruct(); + rows.EndList(); + + auto upsertResult = tableClient.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS); + } + + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + AlterTopic(pqClient, feedPath, {group}); + + for(auto name : {feedPath, feedShortPath} ) { + TKafkaTestClient clientA(testServer.Port); + { + auto msg = clientA.ApiVersions(); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); + } + { + auto msg = clientA.SaslHandshake(); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + { + auto msg = clientA.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check partitions balance + std::vector<TString> topics; + topics.push_back(name); + + // clientA join group, and get all partitions + auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, 1, protocolName, 1); + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, name); + } + } + } // Y_UNIT_TEST(BalanceScenarioCdc) Y_UNIT_TEST(OffsetCommitAndFetchScenario) { TInsecureTestServer testServer("2"); @@ -2092,9 +2208,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(syncRespB->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR); UNIT_ASSERT_VALUES_EQUAL(syncRespC->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR); - auto countPartitions = [](const TConsumerProtocolAssignment& assignment) { + auto countPartitions = [topicName](const TConsumerProtocolAssignment& assignment) { size_t sum = 0; for (auto& ta : assignment.AssignedPartitions) { + UNIT_ASSERT_VALUES_EQUAL(ta.Topic, topicName); sum += ta.Partitions.size(); } return sum; |
