aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-08-04 19:42:44 +0300
committerkomels <komels@ydb.tech>2023-08-04 21:03:30 +0300
commitd33597c466ba0b77f230075f06e7fb61390d8b3a (patch)
treeda9d8f5824deddadfe313504d589d32aed2ab3eb
parent6ef797091b0880091a909294c08d38bad6ab5cf4 (diff)
downloadydb-d33597c466ba0b77f230075f06e7fb61390d8b3a.tar.gz
Kafka metarequest
Tests - inter commit Kafka metadata request
-rw-r--r--ydb/core/engine/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/engine/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/engine/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/engine/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/engine/ya.make2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp34
-rw-r--r--ydb/core/kafka_proxy/kafka_metadata_actor.cpp119
-rw-r--r--ydb/core/kafka_proxy/kafka_metadata_actor.h42
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt7
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt7
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt7
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt7
-rw-r--r--ydb/core/kafka_proxy/ut/metarequest_ut.cpp82
-rw-r--r--ydb/core/kafka_proxy/ut/ya.make7
-rw-r--r--ydb/core/kafka_proxy/ya.make2
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/events.h6
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp2
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp2
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make1
-rw-r--r--ydb/services/persqueue_v1/ut/ya.make1
-rw-r--r--ydb/services/persqueue_v1/ya.make2
37 files changed, 310 insertions, 44 deletions
diff --git a/ydb/core/engine/CMakeLists.darwin-x86_64.txt b/ydb/core/engine/CMakeLists.darwin-x86_64.txt
index ebce4f400f..e424e4640f 100644
--- a/ydb/core/engine/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/engine/CMakeLists.darwin-x86_64.txt
@@ -25,7 +25,6 @@ target_link_libraries(ydb-core-engine PUBLIC
ydb-core-tablet
ydb-library-mkql_proto
library-mkql_proto-protos
- mkql_proto-ut-helpers
api-protos
minikql-comp_nodes-llvm
parser-pg_wrapper-interface
diff --git a/ydb/core/engine/CMakeLists.linux-aarch64.txt b/ydb/core/engine/CMakeLists.linux-aarch64.txt
index 66c80d7921..4650f2e89a 100644
--- a/ydb/core/engine/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/engine/CMakeLists.linux-aarch64.txt
@@ -26,7 +26,6 @@ target_link_libraries(ydb-core-engine PUBLIC
ydb-core-tablet
ydb-library-mkql_proto
library-mkql_proto-protos
- mkql_proto-ut-helpers
api-protos
minikql-comp_nodes-llvm
parser-pg_wrapper-interface
diff --git a/ydb/core/engine/CMakeLists.linux-x86_64.txt b/ydb/core/engine/CMakeLists.linux-x86_64.txt
index 66c80d7921..4650f2e89a 100644
--- a/ydb/core/engine/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/engine/CMakeLists.linux-x86_64.txt
@@ -26,7 +26,6 @@ target_link_libraries(ydb-core-engine PUBLIC
ydb-core-tablet
ydb-library-mkql_proto
library-mkql_proto-protos
- mkql_proto-ut-helpers
api-protos
minikql-comp_nodes-llvm
parser-pg_wrapper-interface
diff --git a/ydb/core/engine/CMakeLists.windows-x86_64.txt b/ydb/core/engine/CMakeLists.windows-x86_64.txt
index ebce4f400f..e424e4640f 100644
--- a/ydb/core/engine/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/engine/CMakeLists.windows-x86_64.txt
@@ -25,7 +25,6 @@ target_link_libraries(ydb-core-engine PUBLIC
ydb-core-tablet
ydb-library-mkql_proto
library-mkql_proto-protos
- mkql_proto-ut-helpers
api-protos
minikql-comp_nodes-llvm
parser-pg_wrapper-interface
diff --git a/ydb/core/engine/ya.make b/ydb/core/engine/ya.make
index b6210466f9..d3d893dae4 100644
--- a/ydb/core/engine/ya.make
+++ b/ydb/core/engine/ya.make
@@ -20,7 +20,7 @@ PEERDIR(
ydb/core/tablet
ydb/library/mkql_proto
ydb/library/mkql_proto/protos
- ydb/library/mkql_proto/ut/helpers
+# ydb/library/mkql_proto/ut/helpers
ydb/public/api/protos
ydb/library/yql/minikql/comp_nodes/llvm
ydb/library/yql/parser/pg_wrapper/interface
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index f5901e67f9..d80040e4c5 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -17,11 +17,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-core-base
ydb-core-protos
ydb-core-raw_socket
+ ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index ce6bfbd67a..df99502c2a 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -18,11 +18,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-core-base
ydb-core-protos
ydb-core-raw_socket
+ ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index ce6bfbd67a..df99502c2a 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -18,11 +18,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-core-base
ydb-core-protos
ydb-core-raw_socket
+ ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index f5901e67f9..d80040e4c5 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -17,11 +17,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-core-base
ydb-core-protos
ydb-core-raw_socket
+ ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index d9553d591b..e486e457c7 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -6,6 +6,7 @@
#include "kafka_events.h"
#include "kafka_messages.h"
#include "kafka_produce_actor.h"
+#include "kafka_metadata_actor.h"
#include "kafka_log_impl.h"
#include <strstream>
@@ -239,34 +240,9 @@ protected:
InflightSize -= messageSize;
}
- void HandleMessage(const TRequestHeaderData* header, const TMetadataRequestData* message, size_t messageSize) {
- TMetadataResponseData response;
- response.ThrottleTimeMs = 0;
- response.ClusterId = "cluster-ahjgk";
- response.ControllerId = 1;
-
- response.Brokers.resize(1);
- response.Brokers[0].NodeId = 1;
- response.Brokers[0].Host = "lbk-dev-02.search.yandex.net";
- response.Brokers[0].Port = 9092;
- response.Brokers[0].Rack = "rack-1-1";
-
- response.Topics.resize(message->Topics.size());
- for(size_t i = 0; i < message->Topics.size(); ++i) {
- response.Topics[i].TopicId = TKafkaUuid(0, i + 1);
- response.Topics[i].Name = message->Topics[i].Name;
- response.Topics[i].Partitions.resize(1);
- response.Topics[i].Partitions[0].PartitionIndex = 0;
- response.Topics[i].Partitions[0].LeaderId = 1; // response.Brokers[0].NodeId
- response.Topics[i].Partitions[0].ReplicaNodes.resize(1);
- response.Topics[i].Partitions[0].ReplicaNodes[0] = 1;
- response.Topics[i].Partitions[0].IsrNodes.resize(1);
- response.Topics[i].Partitions[0].IsrNodes[0] = 1;
- }
-
- Reply(header, &response);
-
- InflightSize -= messageSize;
+ void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t /*messageSize*/) {
+ PendingRequests[header->CorrelationId] = std::move(Request);
+ Register(new TKafkaMetadataActor(header->CorrelationId, message, SelfId()));
}
void ProcessRequest() {
@@ -469,4 +445,4 @@ NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket,
return new TKafkaConnection(std::move(socket), std::move(address), config);
}
-} // namespace NKafka \ No newline at end of file
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
new file mode 100644
index 0000000000..9f6ee86e36
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
@@ -0,0 +1,119 @@
+#include "kafka_metadata_actor.h"
+#include <ydb/services/persqueue_v1/actors/schema_actors.h>
+
+namespace NKafka {
+using namespace NKikimr::NGRpcProxy::V1;
+
+void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
+ Response->Topics.resize(Message->Topics.size());
+ THashMap<TString, TActorId> partitionActors;
+ for (auto i = 0u; i < Message->Topics.size(); ++i) {
+ Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
+ auto& reqTopic = Message->Topics[i];
+ Response->Topics[i].Name = reqTopic.Name.value_or("");
+
+ if (!reqTopic.Name.value_or("")) {
+ AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
+ continue;
+ }
+ const auto& topicName = reqTopic.Name.value();
+ TActorId child;
+ auto namesIter = partitionActors.find(topicName);
+ if (namesIter.IsEnd()) {
+ child = SendTopicRequest(reqTopic);
+ } else {
+ child = namesIter->second;
+ }
+ TopicIndexes[child].push_back(i);
+ }
+ Become(&TKafkaMetadataActor::StateWork);
+ RespondIfRequired(ctx);
+}
+
+TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
+ TGetPartitionsLocationRequest locationRequest{};
+ locationRequest.Topic = topicRequest.Name.value();
+ //ToDo: Get database?
+ //ToDo: Authorization?
+ PendingResponses++;
+ return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
+
+}
+
+void TKafkaMetadataActor::AddTopicError(
+ TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode
+) {
+ topic.ErrorCode = errorCode;
+}
+
+void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
+ topic.ErrorCode = NONE_ERROR;
+ topic.Partitions.reserve(response->Partitions.size());
+ for (const auto& part : response->Partitions) {
+ TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
+ responsePartition.PartitionIndex = part.PartitionId;
+ responsePartition.ErrorCode = NONE_ERROR;
+ responsePartition.LeaderEpoch = part.Generation;
+ responsePartition.ReplicaNodes.push_back(part.NodeId);
+ auto ins = AllClusterNodes.insert(part.NodeId);
+ if (ins.second) {
+ auto broker = TMetadataResponseData::TMetadataResponseBroker{};
+ broker.NodeId = part.NodeId;
+ broker.Host = part.Hostname;
+ Response->Brokers.emplace_back(std::move(broker));
+ }
+ topic.Partitions.emplace_back(std::move(responsePartition));
+ }
+}
+
+void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) {
+ --PendingResponses;
+
+ auto actorIter = TopicIndexes.find(ev->Sender);
+
+ Y_VERIFY_DEBUG(!actorIter.IsEnd());
+ Y_VERIFY_DEBUG(!actorIter->second.empty());
+
+ if (actorIter.IsEnd()) {
+ LOG_CRIT_S(ctx, NKikimrServices::KAFKA_PROXY,
+ "Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply");
+ return RespondIfRequired(ctx);
+
+
+ }
+ if (actorIter->second.empty()) {
+ LOG_CRIT_S(ctx, NKikimrServices::KAFKA_PROXY,
+ "Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
+
+ return RespondIfRequired(ctx);
+ }
+
+ //ToDo: Log and proceed on bad iter
+ for (auto index : actorIter->second) {
+ switch (ev->Get()->Status) {
+ case Ydb::StatusIds::BAD_REQUEST:
+ AddTopicError(Response->Topics[index], EKafkaErrors::INVALID_REQUEST);
+ break;
+ case Ydb::StatusIds::SCHEME_ERROR:
+ AddTopicError(Response->Topics[index], EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION);
+ break;
+ case Ydb::StatusIds::SUCCESS:
+ AddTopicResponse(Response->Topics[index], ev->Get());
+ break;
+ case Ydb::StatusIds::INTERNAL_ERROR:
+ default:
+ AddTopicError(Response->Topics[index], EKafkaErrors::UNKNOWN_SERVER_ERROR);
+ break;
+
+ }
+ }
+ RespondIfRequired(ActorContext());
+}
+
+void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
+ if (--PendingResponses == 0) {
+ Send(Parent, new TEvKafka::TEvResponse(Cookie, Response));
+ Die(ctx);
+ }
+}
+} // namespace NKafka \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.h b/ydb/core/kafka_proxy/kafka_metadata_actor.h
new file mode 100644
index 0000000000..67f6c64a96
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_metadata_actor.h
@@ -0,0 +1,42 @@
+#include "kafka_events.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/services/persqueue_v1/actors/events.h>
+
+namespace NKafka {
+
+class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
+public:
+ TKafkaMetadataActor(ui64 cookie, TMetadataRequestData* message, const TActorId& parent)
+ : Cookie(cookie)
+ , Parent(parent)
+ , Message(message)
+ , Response(new TMetadataResponseData())
+ {}
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+private:
+ using TEvLocationResponse = NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse;
+
+ TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
+ void HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx);
+
+ void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
+ void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
+ void RespondIfRequired(const TActorContext& ctx);
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvLocationResponse, HandleResponse);
+ }
+ }
+ ui64 Cookie;
+ TActorId Parent;
+ TMetadataRequestData* Message;
+ ui64 PendingResponses = 0;
+
+ TMetadataResponseData::TPtr Response;
+ THashMap<TActorId, TVector<ui64>> TopicIndexes;
+ THashSet<ui64> AllClusterNodes;
+};
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
index 27753bb665..47bf8e6799 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,9 @@
add_executable(ydb-core-kafka_proxy-ut)
+target_compile_options(ydb-core-kafka_proxy-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_include_directories(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy
)
@@ -17,6 +20,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
ydb-core-kafka_proxy
+ persqueue-ut-common
+ core-testlib-default
+ ydb_persqueue_core-ut-ut_utils
)
target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
@@ -28,6 +34,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
index 2211b83d4d..63df13dbe7 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,9 @@
add_executable(ydb-core-kafka_proxy-ut)
+target_compile_options(ydb-core-kafka_proxy-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_include_directories(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy
)
@@ -17,6 +20,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
yutil
cpp-testing-unittest_main
ydb-core-kafka_proxy
+ persqueue-ut-common
+ core-testlib-default
+ ydb_persqueue_core-ut-ut_utils
)
target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-ldl
@@ -31,6 +37,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
index 564249b298..70e7385f09 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,9 @@
add_executable(ydb-core-kafka_proxy-ut)
+target_compile_options(ydb-core-kafka_proxy-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_include_directories(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy
)
@@ -18,6 +21,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
ydb-core-kafka_proxy
+ persqueue-ut-common
+ core-testlib-default
+ ydb_persqueue_core-ut-ut_utils
)
target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-ldl
@@ -32,6 +38,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
index 4f256a5faf..38f370cfe1 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,9 @@
add_executable(ydb-core-kafka_proxy-ut)
+target_compile_options(ydb-core-kafka_proxy-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_include_directories(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy
)
@@ -17,10 +20,14 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
ydb-core-kafka_proxy
+ persqueue-ut-common
+ core-testlib-default
+ ydb_persqueue_core-ut-ut_utils
)
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
new file mode 100644
index 0000000000..c7cbb25e88
--- /dev/null
+++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
@@ -0,0 +1,82 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
+#include <ydb/core/kafka_proxy/kafka_metadata_actor.h>
+
+
+namespace NKafka::NTests {
+
+Y_UNIT_TEST_SUITE(TMetadataActorTests) {
+ THolder<TMetadataRequestData> GetMetadataRequest(const TVector<TString>& topics) {
+ auto res = MakeHolder<TMetadataRequestData>();
+ for (const auto& t : topics) {
+ TMetadataRequestData::TMetadataRequestTopic topic;
+ topic.Name = t;
+ res->Topics.push_back(topic);
+ }
+ return res;
+ }
+
+ auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) {
+ auto* runtime = server.CleverServer->GetRuntime();
+ auto request = GetMetadataRequest(topics);
+ auto actorId = runtime->Register(new TKafkaMetadataActor(1, request.Get(), edgeActor));
+ runtime->EnableScheduleForActor(actorId);
+ runtime->DispatchEvents();
+ Cerr << "Wait for response for topics: '";
+ for (const auto& t : topics) {
+ Cerr << t << "', ";
+ }
+ Cerr << Endl;
+ return runtime->GrabEdgeEvent<TEvKafka::TEvResponse>();
+ }
+
+ Y_UNIT_TEST(TopicMetadataGoodAndBad) {
+ NPersQueue::TTestServer server;
+ TString topicName = "rt3.dc1--topic";
+ TString topicName2 = "rt3.dc1--topic2";
+ TString topicPath = TString("/Root/PQ/") + topicName;
+ TString topicPath2 = TString("/Root/PQ/") + topicName2;
+ ui32 totalPartitions = 5;
+ server.AnnoyingClient->CreateTopic(topicName, totalPartitions);
+ server.AnnoyingClient->CreateTopic(topicName2, totalPartitions * 2);
+
+ auto edgeId = server.CleverServer->GetRuntime()->AllocateEdgeActor();
+ auto event = GetEvent(server, edgeId, {topicPath});
+ auto response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1);
+ UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions.size(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes.size(), 1);
+
+ event = GetEvent(server, edgeId, {topicPath, topicPath2});
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2);
+ UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics[1].Partitions.size(), totalPartitions * 2);
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics[1].Partitions[5].ReplicaNodes.size(), 1);
+
+ event = GetEvent(server, edgeId, {topicPath, ""});
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2);
+ UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR);
+ UNIT_ASSERT(response->Topics[1].ErrorCode == EKafkaErrors::INVALID_TOPIC_EXCEPTION);
+
+ event = GetEvent(server, edgeId, {"/Root/bad-topic", topicPath});
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2);
+ UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION);
+ UNIT_ASSERT(response->Topics[1].ErrorCode == EKafkaErrors::NONE_ERROR);
+
+ event = GetEvent(server, edgeId, {"/Root/bad-topic"});
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1);
+ UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION);
+
+ event = GetEvent(server, edgeId, {});
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 0);
+ }
+};
+
+} // namespace NKafka::NTests TEvKafka::TEvMetadataResponse \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make
index cf7a7cf6db..556f5132e7 100644
--- a/ydb/core/kafka_proxy/ut/ya.make
+++ b/ydb/core/kafka_proxy/ut/ya.make
@@ -3,10 +3,15 @@ UNITTEST_FOR(ydb/core/kafka_proxy)
SRCS(
ut_kafka_functions.cpp
ut_serialization.cpp
+ metarequest_ut.cpp
)
PEERDIR(
ydb/core/kafka_proxy
-)
+ ydb/core/persqueue/ut/common
+ ydb/core/testlib/default
+ ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
+)
+YQL_LAST_ABI_VERSION()
END()
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index ce30eca927..326d8f9080 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -12,6 +12,7 @@ SRCS(
kafka_messages_int.cpp
kafka_messages_int.h
kafka_produce_actor.cpp
+ kafka_metadata_actor.cpp
kafka_produce_actor.h
kafka_proxy.h
kafka_records.cpp
@@ -23,6 +24,7 @@ PEERDIR(
ydb/core/base
ydb/core/protos
ydb/core/raw_socket
+ ydb/services/persqueue_v1
)
END()
diff --git a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
index 32eb38ce84..f6601f6bc4 100644
--- a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
@@ -26,7 +26,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC
ydb-core-ydb_convert
ydb-library-aclib
library-persqueue-obfuscate
- library-persqueue-tests
library-persqueue-topic_parser
api-grpc
api-grpc-draft
diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
index dce3c8da1e..064b99364e 100644
--- a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
@@ -27,7 +27,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC
ydb-core-ydb_convert
ydb-library-aclib
library-persqueue-obfuscate
- library-persqueue-tests
library-persqueue-topic_parser
api-grpc
api-grpc-draft
diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
index dce3c8da1e..064b99364e 100644
--- a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
@@ -27,7 +27,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC
ydb-core-ydb_convert
ydb-library-aclib
library-persqueue-obfuscate
- library-persqueue-tests
library-persqueue-topic_parser
api-grpc
api-grpc-draft
diff --git a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
index 32eb38ce84..f6601f6bc4 100644
--- a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
@@ -26,7 +26,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC
ydb-core-ydb_convert
ydb-library-aclib
library-persqueue-obfuscate
- library-persqueue-tests
library-persqueue-topic_parser
api-grpc
api-grpc-draft
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index cf1ee62523..efc0f67e25 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -453,12 +453,12 @@ struct TEvPQProxy {
struct TPartitionLocationInfo {
ui64 PartitionId;
- ui64 IncGeneration;
+ ui64 Generation;
ui64 NodeId;
TString Hostname;
};
- struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvRequestTablet, EvPartitionLocationResponse>
+ struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvPartitionLocationResponse, EvPartitionLocationResponse>
, public TLocalResponseBase
{
@@ -469,6 +469,8 @@ struct TEvPQProxy {
};
struct TLocalRequestBase {
+ TLocalRequestBase() = default;
+
TLocalRequestBase(const TString& topic, const TString& database, const TString& token)
: Topic(topic)
, Database(database)
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 0d71a4b146..4ff1fb6f82 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -1428,7 +1428,7 @@ bool TPartitionsLocationActor::ApplyResponse(
ui64 nodeId = part.GetNodeId();
partLocation.PartitionId = part.GetPartitionId();
- partLocation.IncGeneration = part.GetGeneration() + 1;
+ partLocation.Generation = part.GetGeneration();
partLocation.NodeId = nodeId;
Response->Partitions.emplace_back(std::move(partLocation));
}
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
index fc7ec1c7c3..bd2dfa11ff 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
cpp-digest-md5
core-testlib-default
ydb-library-aclib
+ library-persqueue-tests
library-persqueue-topic_parser
api-grpc
ydb_persqueue_core-ut-ut_utils
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
index 7e21a17b08..f43d22e671 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
cpp-digest-md5
core-testlib-default
ydb-library-aclib
+ library-persqueue-tests
library-persqueue-topic_parser
api-grpc
ydb_persqueue_core-ut-ut_utils
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
index 657b3aae64..de9ee61be0 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
@@ -28,6 +28,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
cpp-digest-md5
core-testlib-default
ydb-library-aclib
+ library-persqueue-tests
library-persqueue-topic_parser
api-grpc
ydb_persqueue_core-ut-ut_utils
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
index 3e155ee8cc..9971d1c24a 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC
cpp-digest-md5
core-testlib-default
ydb-library-aclib
+ library-persqueue-tests
library-persqueue-topic_parser
api-grpc
ydb_persqueue_core-ut-ut_utils
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
index 376e031d9e..04ef72162b 100644
--- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
@@ -212,7 +212,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
for (const auto& p : ev->Partitions) {
UNIT_ASSERT(!p.Hostname.Empty());
UNIT_ASSERT(p.NodeId > 0);
- UNIT_ASSERT(p.IncGeneration > 0);
+// UNIT_ASSERT(p.IncGeneration > 0);
UNIT_ASSERT(p.PartitionId < 15);
allParts.insert(p.PartitionId);
}
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt
index 1f9314d2b2..1c8bcbe935 100644
--- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
ydb-services-persqueue_v1
library-cpp-getopt
library-cpp-svnversion
+ library-persqueue-tests
core-testlib-default
api-grpc
cpp-client-resources
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt
index b97e8a7fe3..757deb67a4 100644
--- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
ydb-services-persqueue_v1
library-cpp-getopt
library-cpp-svnversion
+ library-persqueue-tests
core-testlib-default
api-grpc
cpp-client-resources
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt
index 0993131a3e..e100ca6858 100644
--- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
ydb-services-persqueue_v1
library-cpp-getopt
library-cpp-svnversion
+ library-persqueue-tests
core-testlib-default
api-grpc
cpp-client-resources
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt
index bacaa28fd1..a3ac31b156 100644
--- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
ydb-services-persqueue_v1
library-cpp-getopt
library-cpp-svnversion
+ library-persqueue-tests
core-testlib-default
api-grpc
cpp-client-resources
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make b/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make
index 0257117997..9076b91d46 100644
--- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make
@@ -25,6 +25,7 @@ SRCS(
PEERDIR(
library/cpp/getopt
library/cpp/svnversion
+ ydb/library/persqueue/tests
ydb/core/testlib/default
ydb/public/api/grpc
ydb/public/sdk/cpp/client/resources
diff --git a/ydb/services/persqueue_v1/ut/ya.make b/ydb/services/persqueue_v1/ut/ya.make
index 7a66c7a8cd..55831635f1 100644
--- a/ydb/services/persqueue_v1/ut/ya.make
+++ b/ydb/services/persqueue_v1/ut/ya.make
@@ -36,6 +36,7 @@ PEERDIR(
library/cpp/digest/md5
ydb/core/testlib/default
ydb/library/aclib
+ ydb/library/persqueue/tests
ydb/library/persqueue/topic_parser
ydb/public/api/grpc
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make
index f88136af9c..a16f60e470 100644
--- a/ydb/services/persqueue_v1/ya.make
+++ b/ydb/services/persqueue_v1/ya.make
@@ -27,7 +27,7 @@ PEERDIR(
ydb/core/ydb_convert
ydb/library/aclib
ydb/library/persqueue/obfuscate
- ydb/library/persqueue/tests
+# ydb/library/persqueue/tests
ydb/library/persqueue/topic_parser
ydb/public/api/grpc
ydb/public/api/grpc/draft