diff options
author | komels <[email protected]> | 2023-05-12 18:04:44 +0300 |
---|---|---|
committer | komels <[email protected]> | 2023-05-12 18:04:44 +0300 |
commit | 7d3342c4ec4453b9c70ecf4396e5ff06171434c5 (patch) | |
tree | f5fcb4d6b0fc09cf75c3c902b6511ecc07b9ce99 | |
parent | 77d647c5af534e8cef4a2624b05f7b9d0fe389dc (diff) |
Fix incorrect partition selection
Partition selection was incorrect for first class cititzen mode.
-rw-r--r-- | ydb/services/persqueue_v1/actors/helpers.cpp | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/first_class_src_ids_ut.cpp | 34 |
2 files changed, 35 insertions, 3 deletions
diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index d4db92ba16e..d9577536864 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -36,10 +36,10 @@ TMaybe<ui32> GetPartitionFromConfigOptions( TMaybe<ui32> ret; if (preferred < Max<ui32>()) { ret = preferred; - } else if (!useRoundRobin){ - ret = encodedSrcId.Hash % partPerTablet; } else if (firstClass) { ret = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(encodedSrcId.OriginalSourceId, partPerTablet); + } else if (!useRoundRobin){ + ret = encodedSrcId.Hash % partPerTablet; } return ret; } diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp index afa962759f5..d466a5cd307 100644 --- a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp +++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp @@ -2,6 +2,7 @@ #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> +#include <ydb/services/persqueue_v1/actors/helpers.h> namespace NKikimr::NPersQueueTests { using namespace Tests; @@ -79,12 +80,43 @@ Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) { UNIT_ASSERT(res); writer->Close(); }; - Y_UNUSED(alterAndCheck); + alterAndCheck(2); alterAndCheck(4); alterAndCheck(12); ydbDriver->Stop(true); } + + Y_UNIT_TEST(ProperPartitionSelected) { + TString topic = "/Root/topic-f3"; + auto [server, ydbDriver] = Setup("/Root/otherTopic", false); + //auto& server_ = server; + auto& driver = ydbDriver; + + ui32 partCount = 15; + TString srcId = "mySrcID"; + server->AnnoyingClient->CreateTopicNoLegacy(topic, partCount); + + auto pqClient = NYdb::NTopic::TTopicClient(*driver); + auto topicSettings = NYdb::NTopic::TAlterTopicSettings(); + topicSettings.BeginAddConsumer("debug"); + auto alterRes = pqClient.AlterTopic(topic, topicSettings).GetValueSync(); + UNIT_ASSERT(alterRes.IsSuccess()); + + auto partExpected = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(srcId, partCount); + Y_VERIFY(partExpected < partCount); + auto writer = CreateSimpleWriter(*driver, topic, srcId); + auto res = writer->Write("test-data", writer->GetInitSeqNo() + 1); + UNIT_ASSERT(res); + writer->Close(); + + NYdb::NPersQueue::TReadSessionSettings readerSettings; + readerSettings.ConsumerName("debug").AppendTopics(topic); + auto reader = CreateReader(*driver, readerSettings); + auto mbEv = GetNextMessageSkipAssignment(reader); + UNIT_ASSERT(mbEv.Defined()); + UNIT_ASSERT_VALUES_EQUAL(mbEv->GetPartitionStream()->GetPartitionGroupId(), partExpected + 1); + } } } // namespace NKikimr::NPersQueueTests |