summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <[email protected]>2023-05-12 18:04:44 +0300
committerkomels <[email protected]>2023-05-12 18:04:44 +0300
commit7d3342c4ec4453b9c70ecf4396e5ff06171434c5 (patch)
treef5fcb4d6b0fc09cf75c3c902b6511ecc07b9ce99
parent77d647c5af534e8cef4a2624b05f7b9d0fe389dc (diff)
Fix incorrect partition selection
Partition selection was incorrect for first class cititzen mode.
-rw-r--r--ydb/services/persqueue_v1/actors/helpers.cpp4
-rw-r--r--ydb/services/persqueue_v1/first_class_src_ids_ut.cpp34
2 files changed, 35 insertions, 3 deletions
diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp
index d4db92ba16e..d9577536864 100644
--- a/ydb/services/persqueue_v1/actors/helpers.cpp
+++ b/ydb/services/persqueue_v1/actors/helpers.cpp
@@ -36,10 +36,10 @@ TMaybe<ui32> GetPartitionFromConfigOptions(
TMaybe<ui32> ret;
if (preferred < Max<ui32>()) {
ret = preferred;
- } else if (!useRoundRobin){
- ret = encodedSrcId.Hash % partPerTablet;
} else if (firstClass) {
ret = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(encodedSrcId.OriginalSourceId, partPerTablet);
+ } else if (!useRoundRobin){
+ ret = encodedSrcId.Hash % partPerTablet;
}
return ret;
}
diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
index afa962759f5..d466a5cd307 100644
--- a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
+++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
@@ -2,6 +2,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
+#include <ydb/services/persqueue_v1/actors/helpers.h>
namespace NKikimr::NPersQueueTests {
using namespace Tests;
@@ -79,12 +80,43 @@ Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) {
UNIT_ASSERT(res);
writer->Close();
};
- Y_UNUSED(alterAndCheck);
+
alterAndCheck(2);
alterAndCheck(4);
alterAndCheck(12);
ydbDriver->Stop(true);
}
+
+ Y_UNIT_TEST(ProperPartitionSelected) {
+ TString topic = "/Root/topic-f3";
+ auto [server, ydbDriver] = Setup("/Root/otherTopic", false);
+ //auto& server_ = server;
+ auto& driver = ydbDriver;
+
+ ui32 partCount = 15;
+ TString srcId = "mySrcID";
+ server->AnnoyingClient->CreateTopicNoLegacy(topic, partCount);
+
+ auto pqClient = NYdb::NTopic::TTopicClient(*driver);
+ auto topicSettings = NYdb::NTopic::TAlterTopicSettings();
+ topicSettings.BeginAddConsumer("debug");
+ auto alterRes = pqClient.AlterTopic(topic, topicSettings).GetValueSync();
+ UNIT_ASSERT(alterRes.IsSuccess());
+
+ auto partExpected = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(srcId, partCount);
+ Y_VERIFY(partExpected < partCount);
+ auto writer = CreateSimpleWriter(*driver, topic, srcId);
+ auto res = writer->Write("test-data", writer->GetInitSeqNo() + 1);
+ UNIT_ASSERT(res);
+ writer->Close();
+
+ NYdb::NPersQueue::TReadSessionSettings readerSettings;
+ readerSettings.ConsumerName("debug").AppendTopics(topic);
+ auto reader = CreateReader(*driver, readerSettings);
+ auto mbEv = GetNextMessageSkipAssignment(reader);
+ UNIT_ASSERT(mbEv.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(mbEv->GetPartitionStream()->GetPartitionGroupId(), partExpected + 1);
+ }
}
} // namespace NKikimr::NPersQueueTests