diff options
author | komels <komels@ydb.tech> | 2023-08-04 19:42:44 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-08-04 21:03:30 +0300 |
commit | d33597c466ba0b77f230075f06e7fb61390d8b3a (patch) | |
tree | da9d8f5824deddadfe313504d589d32aed2ab3eb | |
parent | 6ef797091b0880091a909294c08d38bad6ab5cf4 (diff) | |
download | ydb-d33597c466ba0b77f230075f06e7fb61390d8b3a.tar.gz |
Kafka metarequest
Tests - inter commit
Kafka metadata request
37 files changed, 310 insertions, 44 deletions
diff --git a/ydb/core/engine/CMakeLists.darwin-x86_64.txt b/ydb/core/engine/CMakeLists.darwin-x86_64.txt index ebce4f400f..e424e4640f 100644 --- a/ydb/core/engine/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/engine/CMakeLists.darwin-x86_64.txt @@ -25,7 +25,6 @@ target_link_libraries(ydb-core-engine PUBLIC ydb-core-tablet ydb-library-mkql_proto library-mkql_proto-protos - mkql_proto-ut-helpers api-protos minikql-comp_nodes-llvm parser-pg_wrapper-interface diff --git a/ydb/core/engine/CMakeLists.linux-aarch64.txt b/ydb/core/engine/CMakeLists.linux-aarch64.txt index 66c80d7921..4650f2e89a 100644 --- a/ydb/core/engine/CMakeLists.linux-aarch64.txt +++ b/ydb/core/engine/CMakeLists.linux-aarch64.txt @@ -26,7 +26,6 @@ target_link_libraries(ydb-core-engine PUBLIC ydb-core-tablet ydb-library-mkql_proto library-mkql_proto-protos - mkql_proto-ut-helpers api-protos minikql-comp_nodes-llvm parser-pg_wrapper-interface diff --git a/ydb/core/engine/CMakeLists.linux-x86_64.txt b/ydb/core/engine/CMakeLists.linux-x86_64.txt index 66c80d7921..4650f2e89a 100644 --- a/ydb/core/engine/CMakeLists.linux-x86_64.txt +++ b/ydb/core/engine/CMakeLists.linux-x86_64.txt @@ -26,7 +26,6 @@ target_link_libraries(ydb-core-engine PUBLIC ydb-core-tablet ydb-library-mkql_proto library-mkql_proto-protos - mkql_proto-ut-helpers api-protos minikql-comp_nodes-llvm parser-pg_wrapper-interface diff --git a/ydb/core/engine/CMakeLists.windows-x86_64.txt b/ydb/core/engine/CMakeLists.windows-x86_64.txt index ebce4f400f..e424e4640f 100644 --- a/ydb/core/engine/CMakeLists.windows-x86_64.txt +++ b/ydb/core/engine/CMakeLists.windows-x86_64.txt @@ -25,7 +25,6 @@ target_link_libraries(ydb-core-engine PUBLIC ydb-core-tablet ydb-library-mkql_proto library-mkql_proto-protos - mkql_proto-ut-helpers api-protos minikql-comp_nodes-llvm parser-pg_wrapper-interface diff --git a/ydb/core/engine/ya.make b/ydb/core/engine/ya.make index b6210466f9..d3d893dae4 100644 --- a/ydb/core/engine/ya.make +++ b/ydb/core/engine/ya.make @@ -20,7 +20,7 @@ PEERDIR( ydb/core/tablet ydb/library/mkql_proto ydb/library/mkql_proto/protos - ydb/library/mkql_proto/ut/helpers +# ydb/library/mkql_proto/ut/helpers ydb/public/api/protos ydb/library/yql/minikql/comp_nodes/llvm ydb/library/yql/parser/pg_wrapper/interface diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index f5901e67f9..d80040e4c5 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -17,11 +17,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-core-base ydb-core-protos ydb-core-raw_socket + ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index ce6bfbd67a..df99502c2a 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -18,11 +18,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-core-base ydb-core-protos ydb-core-raw_socket + ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index ce6bfbd67a..df99502c2a 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -18,11 +18,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-core-base ydb-core-protos ydb-core-raw_socket + ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index f5901e67f9..d80040e4c5 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -17,11 +17,13 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-core-base ydb-core-protos ydb-core-raw_socket + ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index d9553d591b..e486e457c7 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -6,6 +6,7 @@ #include "kafka_events.h" #include "kafka_messages.h" #include "kafka_produce_actor.h" +#include "kafka_metadata_actor.h" #include "kafka_log_impl.h" #include <strstream> @@ -239,34 +240,9 @@ protected: InflightSize -= messageSize; } - void HandleMessage(const TRequestHeaderData* header, const TMetadataRequestData* message, size_t messageSize) { - TMetadataResponseData response; - response.ThrottleTimeMs = 0; - response.ClusterId = "cluster-ahjgk"; - response.ControllerId = 1; - - response.Brokers.resize(1); - response.Brokers[0].NodeId = 1; - response.Brokers[0].Host = "lbk-dev-02.search.yandex.net"; - response.Brokers[0].Port = 9092; - response.Brokers[0].Rack = "rack-1-1"; - - response.Topics.resize(message->Topics.size()); - for(size_t i = 0; i < message->Topics.size(); ++i) { - response.Topics[i].TopicId = TKafkaUuid(0, i + 1); - response.Topics[i].Name = message->Topics[i].Name; - response.Topics[i].Partitions.resize(1); - response.Topics[i].Partitions[0].PartitionIndex = 0; - response.Topics[i].Partitions[0].LeaderId = 1; // response.Brokers[0].NodeId - response.Topics[i].Partitions[0].ReplicaNodes.resize(1); - response.Topics[i].Partitions[0].ReplicaNodes[0] = 1; - response.Topics[i].Partitions[0].IsrNodes.resize(1); - response.Topics[i].Partitions[0].IsrNodes[0] = 1; - } - - Reply(header, &response); - - InflightSize -= messageSize; + void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t /*messageSize*/) { + PendingRequests[header->CorrelationId] = std::move(Request); + Register(new TKafkaMetadataActor(header->CorrelationId, message, SelfId())); } void ProcessRequest() { @@ -469,4 +445,4 @@ NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, return new TKafkaConnection(std::move(socket), std::move(address), config); } -} // namespace NKafka
\ No newline at end of file +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/kafka_metadata_actor.cpp new file mode 100644 index 0000000000..9f6ee86e36 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_metadata_actor.cpp @@ -0,0 +1,119 @@ +#include "kafka_metadata_actor.h" +#include <ydb/services/persqueue_v1/actors/schema_actors.h> + +namespace NKafka { +using namespace NKikimr::NGRpcProxy::V1; + +void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { + Response->Topics.resize(Message->Topics.size()); + THashMap<TString, TActorId> partitionActors; + for (auto i = 0u; i < Message->Topics.size(); ++i) { + Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{}; + auto& reqTopic = Message->Topics[i]; + Response->Topics[i].Name = reqTopic.Name.value_or(""); + + if (!reqTopic.Name.value_or("")) { + AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION); + continue; + } + const auto& topicName = reqTopic.Name.value(); + TActorId child; + auto namesIter = partitionActors.find(topicName); + if (namesIter.IsEnd()) { + child = SendTopicRequest(reqTopic); + } else { + child = namesIter->second; + } + TopicIndexes[child].push_back(i); + } + Become(&TKafkaMetadataActor::StateWork); + RespondIfRequired(ctx); +} + +TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) { + TGetPartitionsLocationRequest locationRequest{}; + locationRequest.Topic = topicRequest.Name.value(); + //ToDo: Get database? + //ToDo: Authorization? + PendingResponses++; + return Register(new TPartitionsLocationActor(locationRequest, SelfId())); + +} + +void TKafkaMetadataActor::AddTopicError( + TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode +) { + topic.ErrorCode = errorCode; +} + +void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { + topic.ErrorCode = NONE_ERROR; + topic.Partitions.reserve(response->Partitions.size()); + for (const auto& part : response->Partitions) { + TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition; + responsePartition.PartitionIndex = part.PartitionId; + responsePartition.ErrorCode = NONE_ERROR; + responsePartition.LeaderEpoch = part.Generation; + responsePartition.ReplicaNodes.push_back(part.NodeId); + auto ins = AllClusterNodes.insert(part.NodeId); + if (ins.second) { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = part.NodeId; + broker.Host = part.Hostname; + Response->Brokers.emplace_back(std::move(broker)); + } + topic.Partitions.emplace_back(std::move(responsePartition)); + } +} + +void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { + --PendingResponses; + + auto actorIter = TopicIndexes.find(ev->Sender); + + Y_VERIFY_DEBUG(!actorIter.IsEnd()); + Y_VERIFY_DEBUG(!actorIter->second.empty()); + + if (actorIter.IsEnd()) { + LOG_CRIT_S(ctx, NKikimrServices::KAFKA_PROXY, + "Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply"); + return RespondIfRequired(ctx); + + + } + if (actorIter->second.empty()) { + LOG_CRIT_S(ctx, NKikimrServices::KAFKA_PROXY, + "Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply"); + + return RespondIfRequired(ctx); + } + + //ToDo: Log and proceed on bad iter + for (auto index : actorIter->second) { + switch (ev->Get()->Status) { + case Ydb::StatusIds::BAD_REQUEST: + AddTopicError(Response->Topics[index], EKafkaErrors::INVALID_REQUEST); + break; + case Ydb::StatusIds::SCHEME_ERROR: + AddTopicError(Response->Topics[index], EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION); + break; + case Ydb::StatusIds::SUCCESS: + AddTopicResponse(Response->Topics[index], ev->Get()); + break; + case Ydb::StatusIds::INTERNAL_ERROR: + default: + AddTopicError(Response->Topics[index], EKafkaErrors::UNKNOWN_SERVER_ERROR); + break; + + } + } + RespondIfRequired(ActorContext()); +} + +void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { + if (--PendingResponses == 0) { + Send(Parent, new TEvKafka::TEvResponse(Cookie, Response)); + Die(ctx); + } +} +} // namespace NKafka
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.h b/ydb/core/kafka_proxy/kafka_metadata_actor.h new file mode 100644 index 0000000000..67f6c64a96 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_metadata_actor.h @@ -0,0 +1,42 @@ +#include "kafka_events.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/services/persqueue_v1/actors/events.h> + +namespace NKafka { + +class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> { +public: + TKafkaMetadataActor(ui64 cookie, TMetadataRequestData* message, const TActorId& parent) + : Cookie(cookie) + , Parent(parent) + , Message(message) + , Response(new TMetadataResponseData()) + {} + + void Bootstrap(const NActors::TActorContext& ctx); + +private: + using TEvLocationResponse = NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse; + + TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest); + void HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx); + + void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response); + void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode); + void RespondIfRequired(const TActorContext& ctx); + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvLocationResponse, HandleResponse); + } + } + ui64 Cookie; + TActorId Parent; + TMetadataRequestData* Message; + ui64 PendingResponses = 0; + + TMetadataResponseData::TPtr Response; + THashMap<TActorId, TVector<ui64>> TopicIndexes; + THashSet<ui64> AllClusterNodes; +}; + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt index 27753bb665..47bf8e6799 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,9 @@ add_executable(ydb-core-kafka_proxy-ut) +target_compile_options(ydb-core-kafka_proxy-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_include_directories(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy ) @@ -17,6 +20,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kafka_proxy + persqueue-ut-common + core-testlib-default + ydb_persqueue_core-ut-ut_utils ) target_link_options(ydb-core-kafka_proxy-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 @@ -28,6 +34,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt index 2211b83d4d..63df13dbe7 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt @@ -8,6 +8,9 @@ add_executable(ydb-core-kafka_proxy-ut) +target_compile_options(ydb-core-kafka_proxy-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_include_directories(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy ) @@ -17,6 +20,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC yutil cpp-testing-unittest_main ydb-core-kafka_proxy + persqueue-ut-common + core-testlib-default + ydb_persqueue_core-ut-ut_utils ) target_link_options(ydb-core-kafka_proxy-ut PRIVATE -ldl @@ -31,6 +37,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt index 564249b298..70e7385f09 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt @@ -8,6 +8,9 @@ add_executable(ydb-core-kafka_proxy-ut) +target_compile_options(ydb-core-kafka_proxy-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_include_directories(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy ) @@ -18,6 +21,9 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kafka_proxy + persqueue-ut-common + core-testlib-default + ydb_persqueue_core-ut-ut_utils ) target_link_options(ydb-core-kafka_proxy-ut PRIVATE -ldl @@ -32,6 +38,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt index 4f256a5faf..38f370cfe1 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt @@ -8,6 +8,9 @@ add_executable(ydb-core-kafka_proxy-ut) +target_compile_options(ydb-core-kafka_proxy-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_include_directories(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy ) @@ -17,10 +20,14 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kafka_proxy + persqueue-ut-common + core-testlib-default + ydb_persqueue_core-ut-ut_utils ) target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp new file mode 100644 index 0000000000..c7cbb25e88 --- /dev/null +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -0,0 +1,82 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/core/kafka_proxy/kafka_metadata_actor.h> + + +namespace NKafka::NTests { + +Y_UNIT_TEST_SUITE(TMetadataActorTests) { + THolder<TMetadataRequestData> GetMetadataRequest(const TVector<TString>& topics) { + auto res = MakeHolder<TMetadataRequestData>(); + for (const auto& t : topics) { + TMetadataRequestData::TMetadataRequestTopic topic; + topic.Name = t; + res->Topics.push_back(topic); + } + return res; + } + + auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) { + auto* runtime = server.CleverServer->GetRuntime(); + auto request = GetMetadataRequest(topics); + auto actorId = runtime->Register(new TKafkaMetadataActor(1, request.Get(), edgeActor)); + runtime->EnableScheduleForActor(actorId); + runtime->DispatchEvents(); + Cerr << "Wait for response for topics: '"; + for (const auto& t : topics) { + Cerr << t << "', "; + } + Cerr << Endl; + return runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(); + } + + Y_UNIT_TEST(TopicMetadataGoodAndBad) { + NPersQueue::TTestServer server; + TString topicName = "rt3.dc1--topic"; + TString topicName2 = "rt3.dc1--topic2"; + TString topicPath = TString("/Root/PQ/") + topicName; + TString topicPath2 = TString("/Root/PQ/") + topicName2; + ui32 totalPartitions = 5; + server.AnnoyingClient->CreateTopic(topicName, totalPartitions); + server.AnnoyingClient->CreateTopic(topicName2, totalPartitions * 2); + + auto edgeId = server.CleverServer->GetRuntime()->AllocateEdgeActor(); + auto event = GetEvent(server, edgeId, {topicPath}); + auto response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1); + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions.size(), 5); + UNIT_ASSERT_VALUES_EQUAL(response->Topics[0].Partitions[0].ReplicaNodes.size(), 1); + + event = GetEvent(server, edgeId, {topicPath, topicPath2}); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2); + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(response->Topics[1].Partitions.size(), totalPartitions * 2); + UNIT_ASSERT_VALUES_EQUAL(response->Topics[1].Partitions[5].ReplicaNodes.size(), 1); + + event = GetEvent(server, edgeId, {topicPath, ""}); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2); + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR); + UNIT_ASSERT(response->Topics[1].ErrorCode == EKafkaErrors::INVALID_TOPIC_EXCEPTION); + + event = GetEvent(server, edgeId, {"/Root/bad-topic", topicPath}); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2); + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION); + UNIT_ASSERT(response->Topics[1].ErrorCode == EKafkaErrors::NONE_ERROR); + + event = GetEvent(server, edgeId, {"/Root/bad-topic"}); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1); + UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION); + + event = GetEvent(server, edgeId, {}); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 0); + } +}; + +} // namespace NKafka::NTests TEvKafka::TEvMetadataResponse
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index cf7a7cf6db..556f5132e7 100644 --- a/ydb/core/kafka_proxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -3,10 +3,15 @@ UNITTEST_FOR(ydb/core/kafka_proxy) SRCS( ut_kafka_functions.cpp ut_serialization.cpp + metarequest_ut.cpp ) PEERDIR( ydb/core/kafka_proxy -) + ydb/core/persqueue/ut/common + ydb/core/testlib/default + ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils +) +YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index ce30eca927..326d8f9080 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -12,6 +12,7 @@ SRCS( kafka_messages_int.cpp kafka_messages_int.h kafka_produce_actor.cpp + kafka_metadata_actor.cpp kafka_produce_actor.h kafka_proxy.h kafka_records.cpp @@ -23,6 +24,7 @@ PEERDIR( ydb/core/base ydb/core/protos ydb/core/raw_socket + ydb/services/persqueue_v1 ) END() diff --git a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt index 32eb38ce84..f6601f6bc4 100644 --- a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt @@ -26,7 +26,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC ydb-core-ydb_convert ydb-library-aclib library-persqueue-obfuscate - library-persqueue-tests library-persqueue-topic_parser api-grpc api-grpc-draft diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt index dce3c8da1e..064b99364e 100644 --- a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt @@ -27,7 +27,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC ydb-core-ydb_convert ydb-library-aclib library-persqueue-obfuscate - library-persqueue-tests library-persqueue-topic_parser api-grpc api-grpc-draft diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt index dce3c8da1e..064b99364e 100644 --- a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt @@ -27,7 +27,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC ydb-core-ydb_convert ydb-library-aclib library-persqueue-obfuscate - library-persqueue-tests library-persqueue-topic_parser api-grpc api-grpc-draft diff --git a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt index 32eb38ce84..f6601f6bc4 100644 --- a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt @@ -26,7 +26,6 @@ target_link_libraries(ydb-services-persqueue_v1 PUBLIC ydb-core-ydb_convert ydb-library-aclib library-persqueue-obfuscate - library-persqueue-tests library-persqueue-topic_parser api-grpc api-grpc-draft diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index cf1ee62523..efc0f67e25 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -453,12 +453,12 @@ struct TEvPQProxy { struct TPartitionLocationInfo { ui64 PartitionId; - ui64 IncGeneration; + ui64 Generation; ui64 NodeId; TString Hostname; }; - struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvRequestTablet, EvPartitionLocationResponse> + struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvPartitionLocationResponse, EvPartitionLocationResponse> , public TLocalResponseBase { @@ -469,6 +469,8 @@ struct TEvPQProxy { }; struct TLocalRequestBase { + TLocalRequestBase() = default; + TLocalRequestBase(const TString& topic, const TString& database, const TString& token) : Topic(topic) , Database(database) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 0d71a4b146..4ff1fb6f82 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1428,7 +1428,7 @@ bool TPartitionsLocationActor::ApplyResponse( ui64 nodeId = part.GetNodeId(); partLocation.PartitionId = part.GetPartitionId(); - partLocation.IncGeneration = part.GetGeneration() + 1; + partLocation.Generation = part.GetGeneration(); partLocation.NodeId = nodeId; Response->Partitions.emplace_back(std::move(partLocation)); } diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt index fc7ec1c7c3..bd2dfa11ff 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC cpp-digest-md5 core-testlib-default ydb-library-aclib + library-persqueue-tests library-persqueue-topic_parser api-grpc ydb_persqueue_core-ut-ut_utils diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt index 7e21a17b08..f43d22e671 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC cpp-digest-md5 core-testlib-default ydb-library-aclib + library-persqueue-tests library-persqueue-topic_parser api-grpc ydb_persqueue_core-ut-ut_utils diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt index 657b3aae64..de9ee61be0 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt @@ -28,6 +28,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC cpp-digest-md5 core-testlib-default ydb-library-aclib + library-persqueue-tests library-persqueue-topic_parser api-grpc ydb_persqueue_core-ut-ut_utils diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt index 3e155ee8cc..9971d1c24a 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut PUBLIC cpp-digest-md5 core-testlib-default ydb-library-aclib + library-persqueue-tests library-persqueue-topic_parser api-grpc ydb_persqueue_core-ut-ut_utils diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp index 376e031d9e..04ef72162b 100644 --- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp @@ -212,7 +212,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { for (const auto& p : ev->Partitions) { UNIT_ASSERT(!p.Hostname.Empty()); UNIT_ASSERT(p.NodeId > 0); - UNIT_ASSERT(p.IncGeneration > 0); +// UNIT_ASSERT(p.IncGeneration > 0); UNIT_ASSERT(p.PartitionId < 15); allParts.insert(p.PartitionId); } diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt index 1f9314d2b2..1c8bcbe935 100644 --- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC ydb-services-persqueue_v1 library-cpp-getopt library-cpp-svnversion + library-persqueue-tests core-testlib-default api-grpc cpp-client-resources diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt index b97e8a7fe3..757deb67a4 100644 --- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC ydb-services-persqueue_v1 library-cpp-getopt library-cpp-svnversion + library-persqueue-tests core-testlib-default api-grpc cpp-client-resources diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt index 0993131a3e..e100ca6858 100644 --- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC ydb-services-persqueue_v1 library-cpp-getopt library-cpp-svnversion + library-persqueue-tests core-testlib-default api-grpc cpp-client-resources diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt index bacaa28fd1..a3ac31b156 100644 --- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.windows-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC ydb-services-persqueue_v1 library-cpp-getopt library-cpp-svnversion + library-persqueue-tests core-testlib-default api-grpc cpp-client-resources diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make b/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make index 0257117997..9076b91d46 100644 --- a/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/ya.make @@ -25,6 +25,7 @@ SRCS( PEERDIR( library/cpp/getopt library/cpp/svnversion + ydb/library/persqueue/tests ydb/core/testlib/default ydb/public/api/grpc ydb/public/sdk/cpp/client/resources diff --git a/ydb/services/persqueue_v1/ut/ya.make b/ydb/services/persqueue_v1/ut/ya.make index 7a66c7a8cd..55831635f1 100644 --- a/ydb/services/persqueue_v1/ut/ya.make +++ b/ydb/services/persqueue_v1/ut/ya.make @@ -36,6 +36,7 @@ PEERDIR( library/cpp/digest/md5 ydb/core/testlib/default ydb/library/aclib + ydb/library/persqueue/tests ydb/library/persqueue/topic_parser ydb/public/api/grpc ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make index f88136af9c..a16f60e470 100644 --- a/ydb/services/persqueue_v1/ya.make +++ b/ydb/services/persqueue_v1/ya.make @@ -27,7 +27,7 @@ PEERDIR( ydb/core/ydb_convert ydb/library/aclib ydb/library/persqueue/obfuscate - ydb/library/persqueue/tests +# ydb/library/persqueue/tests ydb/library/persqueue/topic_parser ydb/public/api/grpc ydb/public/api/grpc/draft |