diff options
author | komels <komels@ydb.tech> | 2022-12-23 13:09:14 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2022-12-23 13:09:14 +0300 |
commit | c70e9bd2cc0dae340be5089063ae505f73121172 (patch) | |
tree | 4cdaef9a18cbabf5696dc2e432ea5f5072a769c4 | |
parent | 9c0a33e446c7a410d4f1f80cbea29e070841ddb4 (diff) | |
download | ydb-c70e9bd2cc0dae340be5089063ae505f73121172.tar.gz |
Partition mapping for 1st class
22 files changed, 466 insertions, 61 deletions
diff --git a/ydb/core/persqueue/writer/CMakeLists.darwin.txt b/ydb/core/persqueue/writer/CMakeLists.darwin.txt index 7a525a071b7..5d6a962b334 100644 --- a/ydb/core/persqueue/writer/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/writer/CMakeLists.darwin.txt @@ -20,5 +20,6 @@ target_link_libraries(core-persqueue-writer PUBLIC ) target_sources(core-persqueue-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp ) diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt index f957f84a625..7c52f93cf56 100644 --- a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt @@ -21,5 +21,6 @@ target_link_libraries(core-persqueue-writer PUBLIC ) target_sources(core-persqueue-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp ) diff --git a/ydb/core/persqueue/writer/CMakeLists.linux.txt b/ydb/core/persqueue/writer/CMakeLists.linux.txt index f957f84a625..7c52f93cf56 100644 --- a/ydb/core/persqueue/writer/CMakeLists.linux.txt +++ b/ydb/core/persqueue/writer/CMakeLists.linux.txt @@ -21,5 +21,6 @@ target_link_libraries(core-persqueue-writer PUBLIC ) target_sources(core-persqueue-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/source_id_encoding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/metadata_initializers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/writer/writer.cpp ) diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp new file mode 100644 index 00000000000..651242956d8 --- /dev/null +++ b/ydb/core/persqueue/writer/metadata_initializers.cpp @@ -0,0 +1,69 @@ +#include "metadata_initializers.h" + +namespace NKikimr::NGRpcProxy::V1 { + +using namespace NMetadata; + +void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const { + TVector<NInitializer::ITableModifier::TPtr> result; + auto tablePath = TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(); + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(tablePath); + request.add_primary_key("Hash"); + request.add_primary_key("Topic"); + request.add_primary_key("ProducerId"); + { + auto& column = *request.add_columns(); + column.set_name("Hash"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + { + auto& column = *request.add_columns(); + column.set_name("Topic"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8); + } + { + auto& column = *request.add_columns(); + column.set_name("ProducerId"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8); + } + { + auto& column = *request.add_columns(); + column.set_name("AccessTime"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + { + auto& column = *request.add_columns(); + column.set_name("CreateTime"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + { + auto& column = *request.add_columns(); + column.set_name("Partition"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto* partSettings = request.mutable_partitioning_settings(); + partSettings->add_partition_by("Hash"); + partSettings->set_partitioning_by_size(Ydb::FeatureFlag::ENABLED); + partSettings->set_partitioning_by_load(Ydb::FeatureFlag::DISABLED); + partSettings->set_max_partitions_count(1000); + } + { + auto* ttlSettings = request.mutable_ttl_settings(); + auto* columnTtl = ttlSettings->mutable_value_since_unix_epoch(); + columnTtl->set_column_name("AccessTime"); + columnTtl->set_expire_after_seconds(1382400); + columnTtl->set_column_unit(Ydb::Table::ValueSinceUnixEpochModeSettings::UNIT_MILLISECONDS); + } + + result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create")); + } + result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl")); + controller->PreparationFinished(result); +} + + +} // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/core/persqueue/writer/metadata_initializers.h b/ydb/core/persqueue/writer/metadata_initializers.h new file mode 100644 index 00000000000..aa78cf6c48b --- /dev/null +++ b/ydb/core/persqueue/writer/metadata_initializers.h @@ -0,0 +1,51 @@ +#pragma once + +#include <ydb/services/metadata/service.h> +#include <ydb/services/metadata/abstract/initialization.h> + + +namespace NKikimr::NGRpcProxy::V1 { + +using TInitBehaviourPtr = std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour>; +using TClassBehaviourPtr = std::shared_ptr<NMetadata::IClassBehaviour>; + +class TSrcIdMetaInitializer : public NMetadata::NInitializer::IInitializationBehaviour { +public: + static TInitBehaviourPtr GetInstant() { + static TInitBehaviourPtr res{new TSrcIdMetaInitializer()}; + return res; + } + +protected: + virtual void DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const override; + +private: + TSrcIdMetaInitializer() = default; +}; + +class TSrcIdMetaInitManager : public NMetadata::IClassBehaviour { +protected: + virtual TString GetInternalStorageTablePath() const override { + return "TopicPartitionsMapping"; + } + TInitBehaviourPtr ConstructInitializer() const override { + return TSrcIdMetaInitializer::GetInstant(); + } + std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override { + return nullptr; + } +public: + static TClassBehaviourPtr GetInstant() { + static TClassBehaviourPtr res{new TSrcIdMetaInitManager()}; + return res; + } + virtual TString GetTypeId() const override { + return TypeName<TSrcIdMetaInitManager>(); + } + +private: + TSrcIdMetaInitManager() = default; + +}; + +} // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp index b528da20991..c4eb2752658 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.cpp +++ b/ydb/core/persqueue/writer/source_id_encoding.cpp @@ -1,50 +1,105 @@ #include "source_id_encoding.h" +#include "metadata_initializers.h" #include <library/cpp/string_utils/base64/base64.h> - #include <util/generic/yexception.h> #include <util/string/strip.h> #include <util/string/builder.h> #include <util/string/hex.h> #include <util/digest/murmur.h> +#include <util/digest/city.h> +#include <util/string/join.h> + #include <library/cpp/digest/md5/md5.h> namespace NKikimr { namespace NPQ { -TString GetSourceIdSelectQueryFromPath(const TString& path) { +TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) { TStringBuilder res; - res << "--!syntax_v1\n" - "DECLARE $Hash AS Uint32; " - "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8; " - "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` " - "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;"; + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + res << "--!syntax_v1\n" + "DECLARE $Hash AS Uint32; " + "DECLARE $Topic AS Utf8; " + "DECLARE $SourceId AS Utf8; " + "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` " + "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;"; + break; + case ESourceIdTableGeneration::PartitionMapping: + res << "--!syntax_v1\n" + "DECLARE $Hash AS Uint64; " + "DECLARE $Topic AS Utf8; " + "DECLARE $SourceId AS Utf8; " + "SELECT Partition, CreateTime, AccessTime FROM `" + << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + << "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;"; + break; + default: + Y_FAIL(); + } return res; } -TString GetSourceIdSelectQuery(const TString& root) { - return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2"); +TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) { + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2"); + case ESourceIdTableGeneration::PartitionMapping: + return GetUpdateIdSelectQueryFromPath( + NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + ); + default: + Y_FAIL(); + } } -TString GetUpdateIdSelectQueryFromPath(const TString& path) { +TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) { TStringBuilder res; - res << "--!syntax_v1\n" - "DECLARE $SourceId AS Utf8; " - "DECLARE $Topic AS Utf8; " - "DECLARE $Hash AS Uint32; " - "DECLARE $Partition AS Uint32; " - "DECLARE $CreateTime AS Uint64; " - "DECLARE $AccessTime AS Uint64;\n" - "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + res << "--!syntax_v1\n" + "DECLARE $SourceId AS Utf8; " + "DECLARE $Topic AS Utf8; " + "DECLARE $Hash AS Uint32; " + "DECLARE $Partition AS Uint32; " + "DECLARE $CreateTime AS Uint64; " + "DECLARE $AccessTime AS Uint64;\n" + "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + break; + case ESourceIdTableGeneration::PartitionMapping: + res << "--!syntax_v1\n" + "DECLARE $SourceId AS Utf8; " + "DECLARE $Topic AS Utf8; " + "DECLARE $Hash AS Uint64; " + "DECLARE $Partition AS Uint32; " + "DECLARE $CreateTime AS Uint64; " + "DECLARE $AccessTime AS Uint64;\n" + "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + break; + + default: + Y_FAIL(); + } return res; } -TString GetUpdateIdSelectQuery(const TString& root) { - return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2"); +TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) { + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2"); + case ESourceIdTableGeneration::PartitionMapping: + return GetUpdateIdSelectQueryFromPath( + NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + ); + default: + Y_FAIL(); + } } @@ -128,17 +183,44 @@ bool IsValidEncoded(const TString& sourceId) { } -TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId) { +template <class... TArgs> +ui64 GetKeysHash(TArgs&&... args) { + return CityHash64(Join("#", args...)); +} + +TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation) { TEncodedSourceId res; + res.OriginalSourceId = userSourceId; TString encodedSourceId = Encode(userSourceId); res.EscapedSourceId = HexEncode(encodedSourceId); - - TString s = topic + encodedSourceId; - res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED); + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: { + TString s = topic + encodedSourceId; + res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED); + break; + } + case ESourceIdTableGeneration::PartitionMapping: + res.KeysHash = GetKeysHash(topic, encodedSourceId); + break; + } + res.Generation = generation; return res; } +void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId) { + switch (encodedSrcId.Generation) { + case ESourceIdTableGeneration::PartitionMapping: + parameters["$Hash"] = encodedSrcId.KeysHash; + return; + case ESourceIdTableGeneration::SrcIdMeta2: + parameters["$Hash"] = encodedSrcId.Hash; + return; + + } +} + + } // NSourceIdEncoding } // NPQ } // NKikimr diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h index c48a1c574ec..a879521e610 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.h +++ b/ydb/core/persqueue/writer/source_id_encoding.h @@ -2,15 +2,21 @@ #include <util/generic/fwd.h> #include <util/generic/string.h> +#include <ydb/public/lib/deprecated/kicli/kicli.h> namespace NKikimr { namespace NPQ { -TString GetSourceIdSelectQuery(const TString& root); -TString GetUpdateIdSelectQuery(const TString& root); +enum class ESourceIdTableGeneration { + SrcIdMeta2, + PartitionMapping +}; + +TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); +TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); -TString GetSourceIdSelectQueryFromPath(const TString& path); -TString GetUpdateIdSelectQueryFromPath(const TString& path); +TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); +TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); namespace NSourceIdEncoding { @@ -24,11 +30,16 @@ bool IsValidEncoded(const TString& encodedSourceId); struct TEncodedSourceId { + TString OriginalSourceId; TString EscapedSourceId; ui32 Hash = 0; + ui64 KeysHash = 0; + ESourceIdTableGeneration Generation; }; -TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId); +void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId); + +TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation); } // NSourceIdEncoding } // NPQ diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 23545a9a76c..cb9056121fd 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -109,6 +109,7 @@ message TPQConfig { repeated TChannelProfile ChannelProfiles = 31; optional bool TopicsAreFirstClassCitizen = 32 [default = true]; + optional bool UseSrcIdMetaMappingInFirstClass = 52 [default = false]; optional string SourceIdTablePath = 33 [default = "/Root/PQ/SourceIdMeta2"]; @@ -182,6 +183,7 @@ message TPQConfig { } optional TMoveTopicActorConfig MoveTopicActorConfig = 51; + } message TChannelProfile { diff --git a/ydb/services/lib/sharding/sharding.cpp b/ydb/services/lib/sharding/sharding.cpp index ce0e5a1c0a8..e7084470407 100644 --- a/ydb/services/lib/sharding/sharding.cpp +++ b/ydb/services/lib/sharding/sharding.cpp @@ -4,6 +4,7 @@ #include <util/generic/maybe.h> #include <util/generic/yexception.h> #include <util/string/printf.h> +#include <library/cpp/digest/md5/md5.h> namespace NKikimr::NDataStreams::V1 { @@ -92,4 +93,8 @@ namespace NKikimr::NDataStreams::V1 { NYql::NDecimal::TUint128(1); return {left, right}; } + + ui32 CalculateShardFromSrcId(const TString& sourceId, ui32 partitionToTablet) { + return ShardFromDecimal(HexBytesToDecimal(MD5::Calc(sourceId)), partitionToTablet); + } } diff --git a/ydb/services/lib/sharding/sharding.h b/ydb/services/lib/sharding/sharding.h index 257ee8d2be7..bf78f690321 100644 --- a/ydb/services/lib/sharding/sharding.h +++ b/ydb/services/lib/sharding/sharding.h @@ -18,4 +18,5 @@ namespace NKikimr::NDataStreams::V1 { NYql::NDecimal::TUint128 BytesToDecimal(const TString &bytes); bool IsValidDecimal(const TString& bytes); TString Uint128ToDecimalString(NYql::NDecimal::TUint128 decimal, const NYql::NDecimal::TUint128& base = 10); + ui32 CalculateShardFromSrcId(const TString& sourceId, ui32 partitionToTablet); } diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt index e65325c51d9..182006ca30c 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin.txt @@ -25,6 +25,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC api-protos public-lib-base services-lib-actors + services-lib-sharding + ydb-services-metadata ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt index c584633e136..be5980b588d 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt @@ -26,6 +26,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC api-protos public-lib-base services-lib-actors + services-lib-sharding + ydb-services-metadata ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt index c584633e136..be5980b588d 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux.txt @@ -26,6 +26,8 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC api-protos public-lib-base services-lib-actors + services-lib-sharding + ydb-services-metadata ) target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/codecs.cpp diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index b020d327986..d4db92ba16e 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -29,4 +29,19 @@ bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) { return !data.partition_data().empty(); } +TMaybe<ui32> GetPartitionFromConfigOptions( + ui32 preferred, const NPQ::NSourceIdEncoding::TEncodedSourceId& encodedSrcId, + ui32 partPerTablet, bool firstClass, bool useRoundRobin +) { + TMaybe<ui32> ret; + if (preferred < Max<ui32>()) { + ret = preferred; + } else if (!useRoundRobin){ + ret = encodedSrcId.Hash % partPerTablet; + } else if (firstClass) { + ret = NKikimr::NDataStreams::V1::CalculateShardFromSrcId(encodedSrcId.OriginalSourceId, partPerTablet); + } + return ret; +} + } diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h index 98ff39639ea..812749d0095 100644 --- a/ydb/services/persqueue_v1/actors/helpers.h +++ b/ydb/services/persqueue_v1/actors/helpers.h @@ -2,6 +2,8 @@ #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> #include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> +#include <ydb/services/lib/sharding/sharding.h> namespace NKikimr::NGRpcProxy::V1 { @@ -11,4 +13,6 @@ bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::Dat bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data); +TMaybe<ui32> GetPartitionFromConfigOptions(ui32 preferred, const NPQ::NSourceIdEncoding::TEncodedSourceId& encodedSrcId, + ui32 partPerTablet, bool firstClass, bool useRoundRobin); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 0802f359323..1c08ee1853c 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -14,6 +14,7 @@ #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/services/lib/actors/pq_rl_helpers.h> +#include <ydb/services/metadata/service.h> namespace NKikimr::NGRpcProxy::V1 { @@ -122,6 +123,7 @@ private: HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); + HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); default: break; @@ -141,6 +143,8 @@ private: void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx); void TryCloseSession(const TActorContext& ctx); + void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr &ev, const NActors::TActorContext& ctx); + void CheckACL(const TActorContext& ctx); void RecheckACL(const TActorContext& ctx); // Requests fresh ACL from 'SchemeCache' @@ -153,14 +157,18 @@ private: void Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx); void LogSession(const TActorContext& ctx); void InitAfterDiscovery(const TActorContext& ctx); + ui32 CalculateFirstClassPartition(const TActorContext& ctx); + void SendCreateManagerRequest(const TActorContext& ctx); void DiscoverPartition(const NActors::TActorContext& ctx); - void SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx); + void StartSession(const NActors::TActorContext& ctx); + void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx); void UpdatePartition(const NActors::TActorContext& ctx); void RequestNextPartition(const NActors::TActorContext& ctx); void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx); - THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(ui32 hash, const TString& topic, + + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(const TString& topic, const NActors::TActorContext& ctx); void SendUpdateSrcIdsRequests(const TActorContext& ctx); //void InitCheckACL(const TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index e95cf8a00ae..de0f4187812 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -2,10 +2,13 @@ #error "Do not include this file directly" #endif -#include <ydb/library/persqueue/topic_parser/counters.h> - #include "codecs.h" +#include "helpers.h" +#include <ydb/services/metadata/manager/common.h> +#include <ydb/core/persqueue/writer/metadata_initializers.h> + +#include <ydb/library/persqueue/topic_parser/counters.h> #include <ydb/core/persqueue/pq_database.h> #include <ydb/core/persqueue/write_meta.h> #include <ydb/core/protos/services.pb.h> @@ -20,6 +23,7 @@ #include <util/string/escape.h> #include <util/string/printf.h> + using namespace NActors; using namespace NKikimrClient; @@ -204,8 +208,13 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ctx) { Y_VERIFY(Request); - SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath()); - UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath()); + //ToDo !! - Set proper table paths. + const auto& pqConfig = AppData(ctx)->PQConfig; + ESourceIdTableGeneration gen = pqConfig.GetTopicsAreFirstClassCitizen() ? + ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2; + SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen); + UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen); + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "Select srcid query: " << SelectSourceIdQuery); Request->GetStreamCtx()->Attach(ctx.SelfID); if (!Request->GetStreamCtx()->Read()) { @@ -464,8 +473,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) { + ESourceIdTableGeneration gen = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? + ESourceIdTableGeneration::PartitionMapping + : + ESourceIdTableGeneration::SrcIdMeta2; try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId); + EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, gen); } catch (yexception& e) { CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); return; @@ -622,14 +635,31 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse: } template<bool UseMigrationProtocol> +ui32 TWriteSessionActor<UseMigrationProtocol>::CalculateFirstClassPartition(const TActorContext&) { + return NKikimr::NDataStreams::V1::ShardFromDecimal( + NKikimr::NDataStreams::V1::HexBytesToDecimal(MD5::Calc(SourceId)), PartitionToTablet.size() + ); +} + +template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::TActorContext& ctx) { + const auto &pqConfig = AppData(ctx)->PQConfig; + if (pqConfig.GetTopicsAreFirstClassCitizen()) { + if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { + return SendCreateManagerRequest(ctx); + } + auto partitionId = PreferedPartition < Max<ui32>() ? PreferedPartition : CalculateFirstClassPartition(ctx); - if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration] - separate flag for having config tables - auto partitionId = PreferedPartition < Max<ui32>() ? PreferedPartition - : NKikimr::NDataStreams::V1::ShardFromDecimal(NKikimr::NDataStreams::V1::HexBytesToDecimal(MD5::Calc(SourceId)), PartitionToTablet.size()); ProceedPartition(partitionId, ctx); return; } + else { + StartSession(ctx); + } +} + +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::StartSession(const NActors::TActorContext& ctx) { auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); @@ -638,6 +668,20 @@ void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors:: State = ES_WAIT_SESSION; } +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::SendCreateManagerRequest(const TActorContext& ctx) { + ctx.Send( + NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), + new NMetadata::NProvider::TEvPrepareManager(TSrcIdMetaInitManager::GetInstant()) + ); +} + +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::Handle( + NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx +) { + StartSession(ctx); +} template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx) @@ -664,11 +708,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSes Y_VERIFY(!KqpSessionId.empty()); - SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx); + SendSelectPartitionRequest(FullConverter->GetClientsideName(), ctx); } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx) { +void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx) { //read from DS auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -682,8 +726,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(ui32 h // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); NClient::TParameters parameters; + SetHashToTxParams(parameters, EncodedSourceId); - parameters["$Hash"] = hash; parameters["$Topic"] = topic; parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; @@ -729,7 +773,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvGetPartit template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record.GetRef(); - + const auto& pqConfig = AppData(ctx)->PQConfig; if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying"); @@ -772,7 +816,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp return; } if (PartitionFound && PreferedPartition < Max<ui32>() && Partition != PreferedPartition) { - CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.", + CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " + << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) + << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " + "another MessageGroupId, specify PartitionGroupId " << (Partition + 1) + << ", or do not specify PartitionGroupId at all.", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } @@ -784,11 +832,15 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " partition " << Partition << " partitions " << PartitionToTablet.size() << "(" << EncodedSourceId.Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); - if (!PartitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) { - Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : EncodedSourceId.Hash % PartitionToTablet.size(); //choose partition default value - PartitionFound = true; + if (!PartitionFound) { + auto partition = GetPartitionFromConfigOptions(PreferedPartition, EncodedSourceId, PartitionToTablet.size(), + pqConfig.GetTopicsAreFirstClassCitizen(), + pqConfig.GetRoundRobinPartitionMapping()); + if (partition.Defined()) { + PartitionFound = true; + Partition = *partition; + } } - if (PartitionFound) { UpdatePartition(ctx); } else { @@ -815,7 +867,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp template<bool UseMigrationProtocol> THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>::MakeUpdateSourceIdMetadataRequest( - ui32 hash, const TString& topic, const NActors::TActorContext& ctx + const TString& topic, const NActors::TActorContext& ctx ) { auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); @@ -839,7 +891,8 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>: ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); NClient::TParameters parameters; - parameters["$Hash"] = hash; + SetHashToTxParams(parameters, EncodedSourceId); + //parameters["$Hash"] = hash; parameters["$Topic"] = topic; parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; @@ -853,11 +906,10 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>: template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::SendUpdateSrcIdsRequests(const TActorContext& ctx) { - { - auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx); - SourceIdUpdatesInflight++; - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } + auto ev = MakeUpdateSourceIdMetadataRequest(FullConverter->GetClientsideName(), ctx); + + SourceIdUpdatesInflight++; + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } template<bool UseMigrationProtocol> diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp new file mode 100644 index 00000000000..7d15e63eb83 --- /dev/null +++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp @@ -0,0 +1,90 @@ +#include <ydb/services/persqueue_v1/ut/test_utils.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> + +namespace NKikimr::NPersQueueTests { +using namespace Tests; +using namespace NPersQueue; + + +Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) { + THolder<NYdb::TDriver> GetDriver(TTestServer& server) { + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort); + return THolder<NYdb::TDriver>(new NYdb::TDriver(driverCfg)); + } + + std::pair<THolder<TTestServer>, THolder<NYdb::TDriver>> Setup(const TString& topic, bool useMapping) { + auto settings = PQSettings(0); + settings.PQConfig.SetTopicsAreFirstClassCitizen(true); + settings.PQConfig.SetUseSrcIdMetaMappingInFirstClass(useMapping); + auto server = MakeHolder<TTestServer>(settings); + + server->AnnoyingClient->CreateTopicNoLegacy(topic, 1); + auto ydbDriver = GetDriver(*server); + { + auto writer = CreateSimpleWriter(*ydbDriver, topic, "123", 0, "raw"); + Cerr << "Created writer (initially).\n"; + for (auto i = 0; i < 10; ++i) { + writer->Write("test-message", i + 1); + } + Cerr << "Writes done\n"; + writer->Close(); + Cerr << "Writer closed\n"; + } + return {std::move(server), std::move(ydbDriver)}; + } + + Y_UNIT_TEST(TestTableCreated) { + TString topic = "/Root/topic-f1"; + auto [server, ydbDriver] = Setup(topic, true); auto& server_ = server; auto& driver = ydbDriver; + + auto checkMetaTable = [&](ui64 expectedCount) { + TStringBuilder query; + query << "select * from `/Root/.metadata/TopicPartitionsMapping`"; + auto resultSet = server_->AnnoyingClient->RunYqlDataQuery(query); + UNIT_ASSERT(resultSet.Defined()); + NYdb::TResultSetParser parser(*resultSet); + + UNIT_ASSERT_VALUES_EQUAL(resultSet->RowsCount(), expectedCount); + return resultSet->RowsCount(); + }; + checkMetaTable(1); + server->AnnoyingClient->SetNoConfigMode(); + ui64 currSeqNo = 10; + auto alterAndCheck = [&](ui32 pCount) { + server_->AnnoyingClient->AlterTopicNoLegacy(topic, pCount); + auto writer = CreateSimpleWriter(*driver, topic, "123", 0, "raw"); + UNIT_ASSERT_VALUES_EQUAL(writer->GetInitSeqNo(), currSeqNo); + writer->Write("test-data", ++currSeqNo); + writer->Close(); + checkMetaTable(1); + }; + alterAndCheck(5); + alterAndCheck(10); + alterAndCheck(15); + ydbDriver = nullptr; + } + + Y_UNIT_TEST(NoMapping) { + TString topic = "/Root/topic-f2"; + auto [server, ydbDriver] = Setup(topic, false); auto& server_ = server; auto& driver = ydbDriver; + Cerr << "Setup done\n"; + auto alterAndCheck = [&](ui32 pCount) { + server_->AnnoyingClient->AlterTopicNoLegacy(topic, pCount); + auto writer = CreateSimpleWriter(*driver, topic, "12345"); + UNIT_ASSERT_VALUES_EQUAL(writer->GetInitSeqNo(), 0); + auto res = writer->Write("test-data", writer->GetInitSeqNo() + 1); + UNIT_ASSERT(res); + writer->Close(); + }; + Y_UNUSED(alterAndCheck); + alterAndCheck(2); + alterAndCheck(4); + alterAndCheck(12); +// ydbDriver = nullptr; + } +} + +} // namespace NKikimr::NPersQueueTests diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index f01e29cc704..51066713fef 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1016,7 +1016,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.ServerSettings.SetEnableSystemViews(false); server.StartServer(); - server.EnableLogs({ NKikimrServices::PERSQUEUE }); + server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_INFO); server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 10); TPQDataWriter writer("source", server); @@ -1052,9 +1052,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { for (ui32 i = 0; i < 15*5; ++i) { ss[writer.InitSession("sid_rand_" + ToString<ui32>(i), 0, true)]++; } - for (auto &s : ss) { - Cerr << s.first << " " << s.second << "\n"; + for (auto& s : ss) { if (rr) { + Cerr << "Round robin check: " << s.first << ":" << s.second << "\n"; UNIT_ASSERT(s.second >= 4 && s.second <= 6); } } @@ -5572,7 +5572,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { const TString& srcId, ui32 partId, ui64 accessTime = 0 ) { TStringBuilder query; - auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId(topicForHash, srcId); + auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId( + topicForHash, srcId, + NPQ::ESourceIdTableGeneration::SrcIdMeta2 + ); Cerr << "===save partition with time: " << accessTime << Endl; if (accessTime == 0) { diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt index 171567ad6e6..b74a1fde917 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt @@ -46,6 +46,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt index 34350a28a93..4671d58d389 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt @@ -49,6 +49,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt index 867553fab21..9853ebec600 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt @@ -51,6 +51,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp |