aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2022-12-23 13:09:14 +0300
committerkomels <komels@ydb.tech>2022-12-23 13:09:14 +0300
commitc70e9bd2cc0dae340be5089063ae505f73121172 (patch)
tree4cdaef9a18cbabf5696dc2e432ea5f5072a769c4
parent9c0a33e446c7a410d4f1f80cbea29e070841ddb4 (diff)
downloadydb-c70e9bd2cc0dae340be5089063ae505f73121172.tar.gz
Partition mapping for 1st class
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/writer/metadata_initializers.cpp69
-rw-r--r--ydb/core/persqueue/writer/metadata_initializers.h51
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.cpp134
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.h21
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/services/lib/sharding/sharding.cpp5
-rw-r--r--ydb/services/lib/sharding/sharding.h1
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/helpers.cpp15
-rw-r--r--ydb/services/persqueue_v1/actors/helpers.h4
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h12
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp100
-rw-r--r--ydb/services/persqueue_v1/first_class_src_ids_ut.cpp90
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp11
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux.txt1
22 files changed, 466 insertions, 61 deletions
diff --git a/ydb/core/persqueue/writer/CMakeLists.darwin.txt b/ydb/core/persqueue/writer/CMakeLists.darwin.txt
index 7a525a071b7..5d6a962b334 100644
--- a/ydb/core/persqueue/writer/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.darwin.txt
@@ -20,5 +20,6 @@ target_link_libraries(core-persqueue-writer PUBLIC
)
target_sources(core-persqueue-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp
)
diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
index f957f84a625..7c52f93cf56 100644
--- a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
@@ -21,5 +21,6 @@ target_link_libraries(core-persqueue-writer PUBLIC
)
target_sources(core-persqueue-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp
)
diff --git a/ydb/core/persqueue/writer/CMakeLists.linux.txt b/ydb/core/persqueue/writer/CMakeLists.linux.txt
index f957f84a625..7c52f93cf56 100644
--- a/ydb/core/persqueue/writer/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.linux.txt
@@ -21,5 +21,6 @@ target_link_libraries(core-persqueue-writer PUBLIC
)
target_sources(core-persqueue-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp
)
diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp
new file mode 100644
index 00000000000..651242956d8
--- /dev/null
+++ b/ydb/core/persqueue/writer/metadata_initializers.cpp
@@ -0,0 +1,69 @@
+#include "metadata_initializers.h"
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+using namespace NMetadata;
+
+void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const {
+ TVector<NInitializer::ITableModifier::TPtr> result;
+ auto tablePath = TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath();
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(tablePath);
+ request.add_primary_key("Hash");
+ request.add_primary_key("Topic");
+ request.add_primary_key("ProducerId");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("Hash");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("Topic");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ProducerId");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("AccessTime");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("CreateTime");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("Partition");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto* partSettings = request.mutable_partitioning_settings();
+ partSettings->add_partition_by("Hash");
+ partSettings->set_partitioning_by_size(Ydb::FeatureFlag::ENABLED);
+ partSettings->set_partitioning_by_load(Ydb::FeatureFlag::DISABLED);
+ partSettings->set_max_partitions_count(1000);
+ }
+ {
+ auto* ttlSettings = request.mutable_ttl_settings();
+ auto* columnTtl = ttlSettings->mutable_value_since_unix_epoch();
+ columnTtl->set_column_name("AccessTime");
+ columnTtl->set_expire_after_seconds(1382400);
+ columnTtl->set_column_unit(Ydb::Table::ValueSinceUnixEpochModeSettings::UNIT_MILLISECONDS);
+ }
+
+ result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create"));
+ }
+ result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl"));
+ controller->PreparationFinished(result);
+}
+
+
+} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/core/persqueue/writer/metadata_initializers.h b/ydb/core/persqueue/writer/metadata_initializers.h
new file mode 100644
index 00000000000..aa78cf6c48b
--- /dev/null
+++ b/ydb/core/persqueue/writer/metadata_initializers.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <ydb/services/metadata/service.h>
+#include <ydb/services/metadata/abstract/initialization.h>
+
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+using TInitBehaviourPtr = std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour>;
+using TClassBehaviourPtr = std::shared_ptr<NMetadata::IClassBehaviour>;
+
+class TSrcIdMetaInitializer : public NMetadata::NInitializer::IInitializationBehaviour {
+public:
+ static TInitBehaviourPtr GetInstant() {
+ static TInitBehaviourPtr res{new TSrcIdMetaInitializer()};
+ return res;
+ }
+
+protected:
+ virtual void DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const override;
+
+private:
+ TSrcIdMetaInitializer() = default;
+};
+
+class TSrcIdMetaInitManager : public NMetadata::IClassBehaviour {
+protected:
+ virtual TString GetInternalStorageTablePath() const override {
+ return "TopicPartitionsMapping";
+ }
+ TInitBehaviourPtr ConstructInitializer() const override {
+ return TSrcIdMetaInitializer::GetInstant();
+ }
+ std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override {
+ return nullptr;
+ }
+public:
+ static TClassBehaviourPtr GetInstant() {
+ static TClassBehaviourPtr res{new TSrcIdMetaInitManager()};
+ return res;
+ }
+ virtual TString GetTypeId() const override {
+ return TypeName<TSrcIdMetaInitManager>();
+ }
+
+private:
+ TSrcIdMetaInitManager() = default;
+
+};
+
+} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp
index b528da20991..c4eb2752658 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.cpp
+++ b/ydb/core/persqueue/writer/source_id_encoding.cpp
@@ -1,50 +1,105 @@
#include "source_id_encoding.h"
+#include "metadata_initializers.h"
#include <library/cpp/string_utils/base64/base64.h>
-
#include <util/generic/yexception.h>
#include <util/string/strip.h>
#include <util/string/builder.h>
#include <util/string/hex.h>
#include <util/digest/murmur.h>
+#include <util/digest/city.h>
+#include <util/string/join.h>
+
#include <library/cpp/digest/md5/md5.h>
namespace NKikimr {
namespace NPQ {
-TString GetSourceIdSelectQueryFromPath(const TString& path) {
+TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
TStringBuilder res;
- res << "--!syntax_v1\n"
- "DECLARE $Hash AS Uint32; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $SourceId AS Utf8; "
- "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` "
- "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;";
+ switch (generation) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ res << "--!syntax_v1\n"
+ "DECLARE $Hash AS Uint32; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $SourceId AS Utf8; "
+ "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` "
+ "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;";
+ break;
+ case ESourceIdTableGeneration::PartitionMapping:
+ res << "--!syntax_v1\n"
+ "DECLARE $Hash AS Uint64; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $SourceId AS Utf8; "
+ "SELECT Partition, CreateTime, AccessTime FROM `"
+ << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ << "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;";
+ break;
+ default:
+ Y_FAIL();
+ }
return res;
}
-TString GetSourceIdSelectQuery(const TString& root) {
- return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2");
+TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) {
+ switch (generation) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2");
+ case ESourceIdTableGeneration::PartitionMapping:
+ return GetUpdateIdSelectQueryFromPath(
+ NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ );
+ default:
+ Y_FAIL();
+ }
}
-TString GetUpdateIdSelectQueryFromPath(const TString& path) {
+TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
TStringBuilder res;
- res << "--!syntax_v1\n"
- "DECLARE $SourceId AS Utf8; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $Hash AS Uint32; "
- "DECLARE $Partition AS Uint32; "
- "DECLARE $CreateTime AS Uint64; "
- "DECLARE $AccessTime AS Uint64;\n"
- "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
- "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
+ switch (generation) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ res << "--!syntax_v1\n"
+ "DECLARE $SourceId AS Utf8; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $Hash AS Uint32; "
+ "DECLARE $Partition AS Uint32; "
+ "DECLARE $CreateTime AS Uint64; "
+ "DECLARE $AccessTime AS Uint64;\n"
+ "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
+ "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
+ break;
+ case ESourceIdTableGeneration::PartitionMapping:
+ res << "--!syntax_v1\n"
+ "DECLARE $SourceId AS Utf8; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $Hash AS Uint64; "
+ "DECLARE $Partition AS Uint32; "
+ "DECLARE $CreateTime AS Uint64; "
+ "DECLARE $AccessTime AS Uint64;\n"
+ "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES "
+ "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
+ break;
+
+ default:
+ Y_FAIL();
+ }
return res;
}
-TString GetUpdateIdSelectQuery(const TString& root) {
- return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2");
+TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) {
+ switch (generation) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2");
+ case ESourceIdTableGeneration::PartitionMapping:
+ return GetUpdateIdSelectQueryFromPath(
+ NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ );
+ default:
+ Y_FAIL();
+ }
}
@@ -128,17 +183,44 @@ bool IsValidEncoded(const TString& sourceId) {
}
-TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId) {
+template <class... TArgs>
+ui64 GetKeysHash(TArgs&&... args) {
+ return CityHash64(Join("#", args...));
+}
+
+TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation) {
TEncodedSourceId res;
+ res.OriginalSourceId = userSourceId;
TString encodedSourceId = Encode(userSourceId);
res.EscapedSourceId = HexEncode(encodedSourceId);
-
- TString s = topic + encodedSourceId;
- res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED);
+ switch (generation) {
+ case ESourceIdTableGeneration::SrcIdMeta2: {
+ TString s = topic + encodedSourceId;
+ res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED);
+ break;
+ }
+ case ESourceIdTableGeneration::PartitionMapping:
+ res.KeysHash = GetKeysHash(topic, encodedSourceId);
+ break;
+ }
+ res.Generation = generation;
return res;
}
+void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId) {
+ switch (encodedSrcId.Generation) {
+ case ESourceIdTableGeneration::PartitionMapping:
+ parameters["$Hash"] = encodedSrcId.KeysHash;
+ return;
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ parameters["$Hash"] = encodedSrcId.Hash;
+ return;
+
+ }
+}
+
+
} // NSourceIdEncoding
} // NPQ
} // NKikimr
diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h
index c48a1c574ec..a879521e610 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.h
+++ b/ydb/core/persqueue/writer/source_id_encoding.h
@@ -2,15 +2,21 @@
#include <util/generic/fwd.h>
#include <util/generic/string.h>
+#include <ydb/public/lib/deprecated/kicli/kicli.h>
namespace NKikimr {
namespace NPQ {
-TString GetSourceIdSelectQuery(const TString& root);
-TString GetUpdateIdSelectQuery(const TString& root);
+enum class ESourceIdTableGeneration {
+ SrcIdMeta2,
+ PartitionMapping
+};
+
+TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetSourceIdSelectQueryFromPath(const TString& path);
-TString GetUpdateIdSelectQueryFromPath(const TString& path);
+TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
namespace NSourceIdEncoding {
@@ -24,11 +30,16 @@ bool IsValidEncoded(const TString& encodedSourceId);
struct TEncodedSourceId {
+ TString OriginalSourceId;
TString EscapedSourceId;
ui32 Hash = 0;
+ ui64 KeysHash = 0;
+ ESourceIdTableGeneration Generation;
};
-TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId);
+void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId);
+
+TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation);
} // NSourceIdEncoding
} // NPQ
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 23545a9a76c..cb9056121fd 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -109,6 +109,7 @@ message TPQConfig {
repeated TChannelProfile ChannelProfiles = 31;
optional bool TopicsAreFirstClassCitizen = 32 [default = true];
+ optional bool UseSrcIdMetaMappingInFirstClass = 52 [default = false];
optional string SourceIdTablePath = 33 [default = "/Root/PQ/SourceIdMeta2"];
@@ -182,6 +183,7 @@ message TPQConfig {
}
optional TMoveTopicActorConfig MoveTopicActorConfig = 51;
+
}
message TChannelProfile {
diff --git a/ydb/services/lib/sharding/sharding.cpp b/ydb/services/lib/sharding/sharding.cpp
index ce0e5a1c0a8..e7084470407 100644
--- a/ydb/services/lib/sharding/sharding.cpp
+++ b/ydb/services/lib/sharding/sharding.cpp
@@ -4,6 +4,7 @@
#include <util/generic/maybe.h>
#include <util/generic/yexception.h>
#include <util/string/printf.h>
+#include <library/cpp/digest/md5/md5.h>
namespace NKikimr::NDataStreams::V1 {
@@ -92,4 +93,8 @@ namespace NKikimr::NDataStreams::V1 {
NYql::NDecimal::TUint128(1);
return {left, right};
}
+
+ ui32 CalculateShardFromSrcId(const TString& sourceId, ui32 partitionToTablet) {
+ return ShardFromDecimal(HexBytesToDecimal(MD5::Calc(sourceId)), partitionToTablet);
+ }
}
diff --git a/ydb/services/lib/sharding/sharding.h b/ydb/services/lib/sharding/sharding.h
index 257ee8d2be7..bf78f690321 100644
--- a/ydb/services/lib/sharding/sharding.h
+++ b/ydb/services/lib/sharding/sharding.h
@@ -18,4 +18,5 @@ namespace NKikimr::NDataStreams::V1 {
NYql::NDecimal::TUint128 BytesToDecimal(const TString &bytes);
bool IsValidDecimal(const TString& bytes);
TString Uint128ToDecimalString(NYql::NDecimal::TUint128 decimal, const NYql::NDecimal::TUint128& base = 10);
+ ui32 CalculateShardFromSrcId(const TString& sourceId, ui32 partitionToTablet);
}
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
index e65325c51d9..182006ca30c 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt
@@ -25,6 +25,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
api-protos
public-lib-base
services-lib-actors
+ services-lib-sharding
+ ydb-services-metadata
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
index c584633e136..be5980b588d 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
api-protos
public-lib-base
services-lib-actors
+ services-lib-sharding
+ ydb-services-metadata
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
index c584633e136..be5980b588d 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt
@@ -26,6 +26,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
api-protos
public-lib-base
services-lib-actors
+ services-lib-sharding
+ ydb-services-metadata
)
target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp
diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp
index b020d327986..d4db92ba16e 100644
--- a/ydb/services/persqueue_v1/actors/helpers.cpp
+++ b/ydb/services/persqueue_v1/actors/helpers.cpp
@@ -29,4 +29,19 @@ bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) {
return !data.partition_data().empty();
}
+TMaybe<ui32> GetPartitionFromConfigOptions(
+ ui32 preferred, const NPQ::NSourceIdEncoding::TEncodedSourceId& encodedSrcId,
+ ui32 partPerTablet, bool firstClass, bool useRoundRobin
+) {
+ TMaybe<ui32> ret;
+ if (preferred < Max<ui32>()) {
+ ret = preferred;
+ } else if (!useRoundRobin){
+ ret = encodedSrcId.Hash % partPerTablet;
+ } else if (firstClass) {
+ ret = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(encodedSrcId.OriginalSourceId, partPerTablet);
+ }
+ return ret;
+}
+
}
diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h
index 98ff39639ea..812749d0095 100644
--- a/ydb/services/persqueue_v1/actors/helpers.h
+++ b/ydb/services/persqueue_v1/actors/helpers.h
@@ -2,6 +2,8 @@
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
+#include <ydb/services/lib/sharding/sharding.h>
namespace NKikimr::NGRpcProxy::V1 {
@@ -11,4 +13,6 @@ bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::Dat
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data);
+TMaybe<ui32> GetPartitionFromConfigOptions(ui32 preferred, const NPQ::NSourceIdEncoding::TEncodedSourceId& encodedSrcId,
+ ui32 partPerTablet, bool firstClass, bool useRoundRobin);
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 0802f359323..1c08ee1853c 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -14,6 +14,7 @@
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/services/lib/actors/pq_rl_helpers.h>
+#include <ydb/services/metadata/service.h>
namespace NKikimr::NGRpcProxy::V1 {
@@ -122,6 +123,7 @@ private:
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
+ HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
default:
break;
@@ -141,6 +143,8 @@ private:
void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx);
void TryCloseSession(const TActorContext& ctx);
+ void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr &ev, const NActors::TActorContext& ctx);
+
void CheckACL(const TActorContext& ctx);
void RecheckACL(const TActorContext& ctx);
// Requests fresh ACL from 'SchemeCache'
@@ -153,14 +157,18 @@ private:
void Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx);
void LogSession(const TActorContext& ctx);
void InitAfterDiscovery(const TActorContext& ctx);
+ ui32 CalculateFirstClassPartition(const TActorContext& ctx);
+ void SendCreateManagerRequest(const TActorContext& ctx);
void DiscoverPartition(const NActors::TActorContext& ctx);
- void SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx);
+ void StartSession(const NActors::TActorContext& ctx);
+ void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx);
void UpdatePartition(const NActors::TActorContext& ctx);
void RequestNextPartition(const NActors::TActorContext& ctx);
void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx);
- THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(ui32 hash, const TString& topic,
+
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(const TString& topic,
const NActors::TActorContext& ctx);
void SendUpdateSrcIdsRequests(const TActorContext& ctx);
//void InitCheckACL(const TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index e95cf8a00ae..de0f4187812 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -2,10 +2,13 @@
#error "Do not include this file directly"
#endif
-#include <ydb/library/persqueue/topic_parser/counters.h>
-
#include "codecs.h"
+#include "helpers.h"
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/core/persqueue/writer/metadata_initializers.h>
+
+#include <ydb/library/persqueue/topic_parser/counters.h>
#include <ydb/core/persqueue/pq_database.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/core/protos/services.pb.h>
@@ -20,6 +23,7 @@
#include <util/string/escape.h>
#include <util/string/printf.h>
+
using namespace NActors;
using namespace NKikimrClient;
@@ -204,8 +208,13 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ctx) {
Y_VERIFY(Request);
- SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath());
- UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath());
+ //ToDo !! - Set proper table paths.
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+ ESourceIdTableGeneration gen = pqConfig.GetTopicsAreFirstClassCitizen() ?
+ ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2;
+ SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen);
+ UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "Select srcid query: " << SelectSourceIdQuery);
Request->GetStreamCtx()->Attach(ctx.SelfID);
if (!Request->GetStreamCtx()->Read()) {
@@ -464,8 +473,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
+ ESourceIdTableGeneration gen = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ?
+ ESourceIdTableGeneration::PartitionMapping
+ :
+ ESourceIdTableGeneration::SrcIdMeta2;
try {
- EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId);
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, gen);
} catch (yexception& e) {
CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
@@ -622,14 +635,31 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:
}
template<bool UseMigrationProtocol>
+ui32 TWriteSessionActor<UseMigrationProtocol>::CalculateFirstClassPartition(const TActorContext&) {
+ return NKikimr::NDataStreams::V1::ShardFromDecimal(
+ NKikimr::NDataStreams::V1::HexBytesToDecimal(MD5::Calc(SourceId)), PartitionToTablet.size()
+ );
+}
+
+template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::TActorContext& ctx) {
+ const auto &pqConfig = AppData(ctx)->PQConfig;
+ if (pqConfig.GetTopicsAreFirstClassCitizen()) {
+ if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
+ return SendCreateManagerRequest(ctx);
+ }
+ auto partitionId = PreferedPartition < Max<ui32>() ? PreferedPartition : CalculateFirstClassPartition(ctx);
- if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration] - separate flag for having config tables
- auto partitionId = PreferedPartition < Max<ui32>() ? PreferedPartition
- : NKikimr::NDataStreams::V1::ShardFromDecimal(NKikimr::NDataStreams::V1::HexBytesToDecimal(MD5::Calc(SourceId)), PartitionToTablet.size());
ProceedPartition(partitionId, ctx);
return;
}
+ else {
+ StartSession(ctx);
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::StartSession(const NActors::TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
@@ -638,6 +668,20 @@ void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::
State = ES_WAIT_SESSION;
}
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::SendCreateManagerRequest(const TActorContext& ctx) {
+ ctx.Send(
+ NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
+ new NMetadata::NProvider::TEvPrepareManager(TSrcIdMetaInitManager::GetInstant())
+ );
+}
+
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::Handle(
+ NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx
+) {
+ StartSession(ctx);
+}
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx)
@@ -664,11 +708,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSes
Y_VERIFY(!KqpSessionId.empty());
- SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);
+ SendSelectPartitionRequest(FullConverter->GetClientsideName(), ctx);
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx) {
+void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx) {
//read from DS
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
@@ -682,8 +726,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(ui32 h
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
NClient::TParameters parameters;
+ SetHashToTxParams(parameters, EncodedSourceId);
- parameters["$Hash"] = hash;
parameters["$Topic"] = topic;
parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
@@ -729,7 +773,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvGetPartit
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
-
+ const auto& pqConfig = AppData(ctx)->PQConfig;
if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
<< SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying");
@@ -772,7 +816,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
return;
}
if (PartitionFound && PreferedPartition < Max<ui32>() && Partition != PreferedPartition) {
- CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.",
+ CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId "
+ << (Partition + 1) << ", but client provided " << (PreferedPartition + 1)
+ << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use "
+ "another MessageGroupId, specify PartitionGroupId " << (Partition + 1)
+ << ", or do not specify PartitionGroupId at all.",
PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
@@ -784,11 +832,15 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
<< SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " partition " << Partition << " partitions "
<< PartitionToTablet.size() << "(" << EncodedSourceId.Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
- if (!PartitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) {
- Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : EncodedSourceId.Hash % PartitionToTablet.size(); //choose partition default value
- PartitionFound = true;
+ if (!PartitionFound) {
+ auto partition = GetPartitionFromConfigOptions(PreferedPartition, EncodedSourceId, PartitionToTablet.size(),
+ pqConfig.GetTopicsAreFirstClassCitizen(),
+ pqConfig.GetRoundRobinPartitionMapping());
+ if (partition.Defined()) {
+ PartitionFound = true;
+ Partition = *partition;
+ }
}
-
if (PartitionFound) {
UpdatePartition(ctx);
} else {
@@ -815,7 +867,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
template<bool UseMigrationProtocol>
THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>::MakeUpdateSourceIdMetadataRequest(
- ui32 hash, const TString& topic, const NActors::TActorContext& ctx
+ const TString& topic, const NActors::TActorContext& ctx
) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -839,7 +891,8 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>:
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
NClient::TParameters parameters;
- parameters["$Hash"] = hash;
+ SetHashToTxParams(parameters, EncodedSourceId);
+ //parameters["$Hash"] = hash;
parameters["$Topic"] = topic;
parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
@@ -853,11 +906,10 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>:
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::SendUpdateSrcIdsRequests(const TActorContext& ctx) {
- {
- auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);
- SourceIdUpdatesInflight++;
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
+ auto ev = MakeUpdateSourceIdMetadataRequest(FullConverter->GetClientsideName(), ctx);
+
+ SourceIdUpdatesInflight++;
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}
template<bool UseMigrationProtocol>
diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
new file mode 100644
index 00000000000..7d15e63eb83
--- /dev/null
+++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
@@ -0,0 +1,90 @@
+#include <ydb/services/persqueue_v1/ut/test_utils.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
+
+namespace NKikimr::NPersQueueTests {
+using namespace Tests;
+using namespace NPersQueue;
+
+
+Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) {
+ THolder<NYdb::TDriver> GetDriver(TTestServer& server) {
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort);
+ return THolder<NYdb::TDriver>(new NYdb::TDriver(driverCfg));
+ }
+
+ std::pair<THolder<TTestServer>, THolder<NYdb::TDriver>> Setup(const TString& topic, bool useMapping) {
+ auto settings = PQSettings(0);
+ settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ settings.PQConfig.SetUseSrcIdMetaMappingInFirstClass(useMapping);
+ auto server = MakeHolder<TTestServer>(settings);
+
+ server->AnnoyingClient->CreateTopicNoLegacy(topic, 1);
+ auto ydbDriver = GetDriver(*server);
+ {
+ auto writer = CreateSimpleWriter(*ydbDriver, topic, "123", 0, "raw");
+ Cerr << "Created writer (initially).\n";
+ for (auto i = 0; i < 10; ++i) {
+ writer->Write("test-message", i + 1);
+ }
+ Cerr << "Writes done\n";
+ writer->Close();
+ Cerr << "Writer closed\n";
+ }
+ return {std::move(server), std::move(ydbDriver)};
+ }
+
+ Y_UNIT_TEST(TestTableCreated) {
+ TString topic = "/Root/topic-f1";
+ auto [server, ydbDriver] = Setup(topic, true); auto& server_ = server; auto& driver = ydbDriver;
+
+ auto checkMetaTable = [&](ui64 expectedCount) {
+ TStringBuilder query;
+ query << "select * from `/Root/.metadata/TopicPartitionsMapping`";
+ auto resultSet = server_->AnnoyingClient->RunYqlDataQuery(query);
+ UNIT_ASSERT(resultSet.Defined());
+ NYdb::TResultSetParser parser(*resultSet);
+
+ UNIT_ASSERT_VALUES_EQUAL(resultSet->RowsCount(), expectedCount);
+ return resultSet->RowsCount();
+ };
+ checkMetaTable(1);
+ server->AnnoyingClient->SetNoConfigMode();
+ ui64 currSeqNo = 10;
+ auto alterAndCheck = [&](ui32 pCount) {
+ server_->AnnoyingClient->AlterTopicNoLegacy(topic, pCount);
+ auto writer = CreateSimpleWriter(*driver, topic, "123", 0, "raw");
+ UNIT_ASSERT_VALUES_EQUAL(writer->GetInitSeqNo(), currSeqNo);
+ writer->Write("test-data", ++currSeqNo);
+ writer->Close();
+ checkMetaTable(1);
+ };
+ alterAndCheck(5);
+ alterAndCheck(10);
+ alterAndCheck(15);
+ ydbDriver = nullptr;
+ }
+
+ Y_UNIT_TEST(NoMapping) {
+ TString topic = "/Root/topic-f2";
+ auto [server, ydbDriver] = Setup(topic, false); auto& server_ = server; auto& driver = ydbDriver;
+ Cerr << "Setup done\n";
+ auto alterAndCheck = [&](ui32 pCount) {
+ server_->AnnoyingClient->AlterTopicNoLegacy(topic, pCount);
+ auto writer = CreateSimpleWriter(*driver, topic, "12345");
+ UNIT_ASSERT_VALUES_EQUAL(writer->GetInitSeqNo(), 0);
+ auto res = writer->Write("test-data", writer->GetInitSeqNo() + 1);
+ UNIT_ASSERT(res);
+ writer->Close();
+ };
+ Y_UNUSED(alterAndCheck);
+ alterAndCheck(2);
+ alterAndCheck(4);
+ alterAndCheck(12);
+// ydbDriver = nullptr;
+ }
+}
+
+} // namespace NKikimr::NPersQueueTests
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index f01e29cc704..51066713fef 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1016,7 +1016,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.ServerSettings.SetEnableSystemViews(false);
server.StartServer();
- server.EnableLogs({ NKikimrServices::PERSQUEUE });
+ server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_INFO);
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 10);
TPQDataWriter writer("source", server);
@@ -1052,9 +1052,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
for (ui32 i = 0; i < 15*5; ++i) {
ss[writer.InitSession("sid_rand_" + ToString<ui32>(i), 0, true)]++;
}
- for (auto &s : ss) {
- Cerr << s.first << " " << s.second << "\n";
+ for (auto& s : ss) {
if (rr) {
+ Cerr << "Round robin check: " << s.first << ":" << s.second << "\n";
UNIT_ASSERT(s.second >= 4 && s.second <= 6);
}
}
@@ -5572,7 +5572,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
const TString& srcId, ui32 partId, ui64 accessTime = 0
) {
TStringBuilder query;
- auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId(topicForHash, srcId);
+ auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId(
+ topicForHash, srcId,
+ NPQ::ESourceIdTableGeneration::SrcIdMeta2
+ );
Cerr << "===save partition with time: " << accessTime << Endl;
if (accessTime == 0) {
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
index 171567ad6e6..b74a1fde917 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
@@ -46,6 +46,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
index 34350a28a93..4671d58d389 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
@@ -49,6 +49,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
index 867553fab21..9853ebec600 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
@@ -51,6 +51,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp