diff options
author | savnik <savnik@yandex-team.com> | 2023-09-12 19:24:55 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-09-12 20:17:29 +0300 |
commit | 2a2bcd52988e79127e66ac6fbd95b5efdfd331ab (patch) | |
tree | 58afa7fad7bc5aa6deef27030857576e11cb2a1d | |
parent | 0426efbfb18ec3d5afbec49cbc2c047b4392296a (diff) | |
download | ydb-2a2bcd52988e79127e66ac6fbd95b5efdfd331ab.tar.gz |
Kafka LIST_OFFSETS
18 files changed, 1216 insertions, 25 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index e5665674bfa..64b8bf406e5 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -34,6 +34,8 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index b78f33c0f11..e14878f0cb7 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -35,6 +35,8 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index b78f33c0f11..e14878f0cb7 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -35,6 +35,8 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index e5665674bfa..64b8bf406e5 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -34,6 +34,8 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 637e222c8ad..dc93c5d6277 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -70,11 +70,25 @@ inline bool RequireAuthentication(EApiKey apiKey) { return !(EApiKey::API_VERSIONS == apiKey || EApiKey::SASL_HANDSHAKE == apiKey || EApiKey::SASL_AUTHENTICATE == apiKey); } +inline EKafkaErrors ConvertErrorCode(Ydb::StatusIds::StatusCode status) { + switch (status) { + case Ydb::StatusIds::BAD_REQUEST: + return EKafkaErrors::INVALID_REQUEST; + case Ydb::StatusIds::SCHEME_ERROR: + return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + case Ydb::StatusIds::UNAUTHORIZED: + return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; + default: + return EKafkaErrors::UNKNOWN_SERVER_ERROR; + } +} + NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TApiVersionsRequestData>& message); NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TInitProducerIdRequestData>& message); NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message); NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context); NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message); NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message); +NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp new file mode 100644 index 00000000000..835a850f731 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp @@ -0,0 +1,132 @@ +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/base/ticket_parser.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <ydb/services/persqueue_v1/actors/schema_actors.h> + +#include "actors.h" +#include "kafka_list_offsets_actor.h" +#include "kafka_topic_offsets_actor.h" + + +namespace NKafka { + +using namespace NKikimr::NGRpcProxy::V1; + +using TListOffsetsPartitionResponse = TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse; + + +NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message) { + return new TKafkaListOffsetsActor(context, correlationId, message); +} + +void TKafkaListOffsetsActor::Bootstrap(const NActors::TActorContext& ctx) { + ListOffsetsResponseData->Topics.resize(ListOffsetsRequestData->Topics.size()); + + SendOffsetsRequests(ctx); + Become(&TKafkaListOffsetsActor::StateWork); + RespondIfRequired(ctx); +} + +void TKafkaListOffsetsActor::SendOffsetsRequests(const NActors::TActorContext& ctx) { + for (size_t i = 0; i < ListOffsetsRequestData->Topics.size(); ++i) { + auto &requestTopic = ListOffsetsRequestData->Topics[i]; + auto &responseTopic = ListOffsetsResponseData->Topics[i]; + + responseTopic = TListOffsetsResponseData::TListOffsetsTopicResponse{}; + + if (!requestTopic.Name.has_value()) { + HandleMissingTopicName(requestTopic, responseTopic); + continue; + } + + responseTopic.Name = requestTopic.Name; + std::unordered_map<ui64, TPartitionRequestInfo> partitionsMap; + + for (auto& partition: requestTopic.Partitions) { + partitionsMap[partition.PartitionIndex] = TPartitionRequestInfo{.Timestamp = partition.Timestamp}; + } + + TopicsRequestsInfo[SendOffsetsRequest(requestTopic, ctx)] = {i, partitionsMap}; + } +} + +void TKafkaListOffsetsActor::HandleMissingTopicName(const TListOffsetsRequestData::TListOffsetsTopic& requestTopic, TListOffsetsResponseData::TListOffsetsTopicResponse& responseTopic) { + for (auto& partition: requestTopic.Partitions) { + TListOffsetsPartitionResponse responsePartition; + responsePartition.PartitionIndex = partition.PartitionIndex; + responsePartition.ErrorCode = INVALID_TOPIC_EXCEPTION; + ErrorCode = INVALID_TOPIC_EXCEPTION; + responseTopic.Partitions.emplace_back(std::move(responsePartition)); + } +} + +TActorId TKafkaListOffsetsActor::SendOffsetsRequest(const TListOffsetsRequestData::TListOffsetsTopic& topic, const NActors::TActorContext&) { + KAFKA_LOG_D("Get offsets for topic '" << topic.Name << "' for user '" << Context->UserToken->GetUserSID() << "'"); + + TEvKafka::TGetOffsetsRequest offsetsRequest; + offsetsRequest.Topic = topic.Name.value(); + offsetsRequest.Token = Context->UserToken->GetSerializedToken(); + offsetsRequest.Database = Context->DatabasePath; + + for (const auto& partitionRequest: topic.Partitions) { + offsetsRequest.PartitionIds.push_back(partitionRequest.PartitionIndex); + } + + PendingResponses++; + return Register(new TTopicOffsetsActor(offsetsRequest, SelfId())); +} + +void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev, const TActorContext& ctx) { + --PendingResponses; + auto it = TopicsRequestsInfo.find(ev->Sender); + if (it == TopicsRequestsInfo.end()) { + KAFKA_LOG_CRIT("ListOffsets actor: received unexpected TEvTopicOffsetsResponse. Ignoring."); + return RespondIfRequired(ctx); + } + + const auto& topicIndex = it->second.first; + auto& partitionsRequestInfoMap = it->second.second; + + auto& responseTopic = ListOffsetsResponseData->Topics[topicIndex]; + responseTopic.Partitions.reserve(ev->Get()->Partitions.size()); + + for (auto& partition: ev->Get()->Partitions) { + TListOffsetsPartitionResponse responsePartition {}; + responsePartition.PartitionIndex = partition.PartitionId; + + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + responsePartition.LeaderEpoch = partition.Generation; + + auto timestamp = partitionsRequestInfoMap[partition.PartitionId].Timestamp; + responsePartition.Timestamp = TIMESTAMP_DEFAULT_RESPONSE_VALUE; + + if (timestamp == TIMESTAMP_START_OFFSET) { + responsePartition.Offset = partition.StartOffset; + responsePartition.ErrorCode = NONE_ERROR; + } else if (timestamp == TIMESTAMP_END_OFFSET) { + responsePartition.Offset = partition.EndOffset; + responsePartition.ErrorCode = NONE_ERROR; + } else { + responsePartition.ErrorCode = INVALID_REQUEST; //TODO savnik: handle it + ErrorCode = INVALID_REQUEST; + } + } else { + responsePartition.ErrorCode = ConvertErrorCode(ev->Get()->Status); + } + + responseTopic.Partitions.emplace_back(std::move(responsePartition)); + } + + RespondIfRequired(ctx); +} + +void TKafkaListOffsetsActor::RespondIfRequired(const TActorContext& ctx) { + if (PendingResponses == 0) { + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, ListOffsetsResponseData, ErrorCode)); + Die(ctx); + } +} + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h new file mode 100644 index 00000000000..8ae7afb5aac --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h @@ -0,0 +1,57 @@ +#pragma once + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/kafka_proxy/kafka_events.h> +#include "ydb/library/aclib/aclib.h" +#include <ydb/services/persqueue_v1/actors/events.h> + +#include "actors.h" + +namespace NKafka { + +struct TPartitionRequestInfo { + i64 Timestamp; +}; + +class TKafkaListOffsetsActor: public NActors::TActorBootstrapped<TKafkaListOffsetsActor> { + +static constexpr int TIMESTAMP_START_OFFSET = -2; +static constexpr int TIMESTAMP_END_OFFSET = -1; +static constexpr int TIMESTAMP_DEFAULT_RESPONSE_VALUE = -1; + +public: + TKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message) + : Context(context) + , CorrelationId(correlationId) + , ListOffsetsRequestData(message) + , ListOffsetsResponseData(new TListOffsetsResponseData()) { + } + +void Bootstrap(const NActors::TActorContext& ctx); + +void Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev, const NActors::TActorContext& ctx); +void RespondIfRequired(const NActors::TActorContext& ctx); + +private: + void SendOffsetsRequests(const NActors::TActorContext& ctx); + TActorId SendOffsetsRequest(const TListOffsetsRequestData::TListOffsetsTopic& topic, const NActors::TActorContext&); + void HandleMissingTopicName(const TListOffsetsRequestData::TListOffsetsTopic& requestTopic, TListOffsetsResponseData::TListOffsetsTopicResponse& responseTopic); + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvTopicOffsetsResponse, Handle); + } + } + +private: + const TContext::TPtr Context; + const ui64 CorrelationId; + const TMessagePtr<TListOffsetsRequestData> ListOffsetsRequestData; + ui64 PendingResponses = 0; + const TListOffsetsResponseData::TPtr ListOffsetsResponseData; + + EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR; + std::unordered_map<TActorId, std::pair<size_t, std::unordered_map<ui64, TPartitionRequestInfo>>> TopicsRequestsInfo; +}; + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 4a90fb3a009..5a24dfc20ff 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -34,6 +34,7 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { auto namesIter = partitionActors.find(topicName); if (namesIter.IsEnd()) { child = SendTopicRequest(reqTopic); + partitionActors[topicName] = child; } else { child = namesIter->second; } @@ -104,19 +105,6 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo } } -EKafkaErrors ConvertErrorCode(Ydb::StatusIds::StatusCode status) { - switch (status) { - case Ydb::StatusIds::BAD_REQUEST: - return EKafkaErrors::INVALID_REQUEST; - case Ydb::StatusIds::SCHEME_ERROR: - return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; - case Ydb::StatusIds::UNAUTHORIZED: - return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; - default: - return EKafkaErrors::UNKNOWN_SERVER_ERROR; - } -} - void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) { --PendingResponses; diff --git a/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp new file mode 100644 index 00000000000..9a22b431bb9 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp @@ -0,0 +1,79 @@ +#include "kafka_topic_offsets_actor.h" + +#include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/services/persqueue_v1/actors/schema_actors.h> + + +namespace NKafka { + +TTopicOffsetsActor::TTopicOffsetsActor(const TEvKafka::TGetOffsetsRequest& request, const TActorId& requester) + : TBase(request, requester) + , TDescribeTopicActorImpl(NKikimr::NGRpcProxy::V1::TDescribeTopicActorSettings::DescribeTopic( + true, + false, + request.PartitionIds)) +{ +} + +void TTopicOffsetsActor::Bootstrap(const NActors::TActorContext&) +{ + SendDescribeProposeRequest(); + Become(&TTopicOffsetsActor::StateWork); +} + +void TTopicOffsetsActor::StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + default: + if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) { + TBase::StateWork(ev); + }; + } +} + +void TTopicOffsetsActor::HandleCacheNavigateResponse( + NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev +) { + if (!TBase::HandleCacheNavigateResponseBase(ev)) { + return; + } + std::unordered_set<ui32> partititons; + for (ui32 i = 0; i < PQGroupInfo->Description.PartitionsSize(); i++) { + auto part = PQGroupInfo->Description.GetPartitions(i).GetPartitionId(); + partititons.insert(part); + } + + for (auto requestedPartition: Settings.Partitions) { + if (partititons.find(requestedPartition) == partititons.end()) { + return RaiseError( + TStringBuilder() << "No partition " << requestedPartition << " in topic", + Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::SCHEME_ERROR, ActorContext() + ); + } + } + + ProcessTablets(PQGroupInfo->Description, this->ActorContext()); +} + +void TTopicOffsetsActor::ApplyResponse(TTabletInfo& tablet, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext&) { + const auto& record = ev->Get()->Record; + for (auto i = 0u; i < record.PartResultSize(); i++) { + const auto& part = record.GetPartResult(i); + TEvKafka::TPartitionOffsetsInfo resultPartititon; + resultPartititon.PartitionId = part.GetPartition(); + resultPartititon.StartOffset = part.GetStartOffset(); + resultPartititon.StartOffset = part.GetEndOffset(); + resultPartititon.Generation = tablet.Generation; + Response->Partitions.emplace_back(std::move(resultPartititon)); + } +} + +void TTopicOffsetsActor::Reply(const TActorContext&) { + this->RespondWithCode(Ydb::StatusIds::SUCCESS); +} + +void TTopicOffsetsActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) { + this->AddIssue(NKikimr::NGRpcProxy::V1::FillIssue(error, errorCode)); + this->RespondWithCode(status); +} + +}// namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.h b/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.h new file mode 100644 index 00000000000..9af2194a0ec --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.h @@ -0,0 +1,49 @@ +#pragma once + +#include "actors.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/library/aclib/aclib.h> +#include <ydb/services/persqueue_v1/actors/events.h> +#include <ydb/services/persqueue_v1/actors/schema_actors.h> +#include <ydb/services/lib/actors/pq_schema_actor.h> +#include <ydb/core/kafka_proxy/kafka_events.h> + +namespace NKafka { + +class TTopicOffsetsActor : public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetsActor, + TEvKafka::TGetOffsetsRequest, + TEvKafka::TEvTopicOffsetsResponse> + , public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl { + +using TBase = TPQInternalSchemaActor<TTopicOffsetsActor, + TEvKafka::TGetOffsetsRequest, + TEvKafka::TEvTopicOffsetsResponse>; + +public: + TTopicOffsetsActor(const TEvKafka::TGetOffsetsRequest& request, const TActorId& requester); + + ~TTopicOffsetsActor() = default; + + void Bootstrap(const NActors::TActorContext& ctx) override; + + void StateWork(TAutoPtr<IEventHandle>& ev); + + void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; + + virtual void ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&, + const TActorContext&) override { + Y_FAIL(); + } + bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr&, const TActorContext&) override { + Y_FAIL(); + } + + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; + + void Reply(const TActorContext&) override; + + void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) override; +}; + +}// namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 0262515ac63..d5e4a5703ed 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -171,12 +171,10 @@ protected: } void SendRequestMetrics(const TActorContext& ctx) { - if (Context) { - ctx.Send(MakeKafkaMetricsServiceID(), - new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, Request->Method, "", "api.kafka.request.count", ""))); - ctx.Send(MakeKafkaMetricsServiceID(), + ctx.Send(MakeKafkaMetricsServiceID(), + new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, Request->Method, "", "api.kafka.request.count", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(Request->Size, BuildLabels(Context, Request->Method, "", "api.kafka.request.bytes", ""))); - } } void SendResponseMetrics(const TString method, const TInstant requestStartTime, i32 bytes, EKafkaErrors errorCode, const TActorContext& ctx) { @@ -244,11 +242,15 @@ protected: Register(CreateKafkaSaslHandshakeActor(Context, header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TListOffsetsRequestData>& message) { + Register(CreateKafkaListOffsetsActor(Context, header->CorrelationId, message)); + } + template<class T> TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) { return TMessagePtr<T>(request->Buffer, request->Message); } - + void ProcessRequest(const TActorContext& ctx) { KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); @@ -284,6 +286,10 @@ protected: HandleMessage(&Request->Header, Cast<TSaslAuthenticateRequestData>(Request)); break; + case LIST_OFFSETS: + HandleMessage(&Request->Header, Cast<TListOffsetsRequestData>(Request)); + return; + default: KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); PassAway(); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index 32d55f3a490..523ea3b616c 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -2,6 +2,7 @@ #include <library/cpp/actors/core/event_local.h> #include <ydb/core/base/events.h> +#include <ydb/services/persqueue_v1/actors/events.h> #include "kafka_messages.h" #include "ydb/library/aclib/aclib.h" @@ -20,6 +21,7 @@ struct TEvKafka { EvWakeup, EvUpdateCounter, EvUpdateHistCounter, + EvTopicOffsetsResponse, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -130,6 +132,33 @@ struct TEvKafka { struct TEvWakeup : public TEventLocal<TEvWakeup, EvWakeup> { }; + +struct TPartitionOffsetsInfo { + ui64 PartitionId; + ui64 Generation; + ui64 StartOffset; + ui64 EndOffset; +}; + +struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase { + TGetOffsetsRequest() = default; + TGetOffsetsRequest(const TString& topic, const TString& database, const TString& token, const TVector<ui32>& partitionIds) + : TLocalRequestBase(topic, database, token) + , PartitionIds(partitionIds) + {} + + TVector<ui32> PartitionIds; +}; + +struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse> + , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase +{ + TEvTopicOffsetsResponse() + {} + + TVector<TPartitionOffsetsInfo> Partitions; +}; + }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 52cb51d18f1..89b32e0efe9 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -10,6 +10,7 @@ const std::unordered_map<EApiKey, TString> EApiKeyNames = { {EApiKey::HEADER, "HEADER"}, {EApiKey::PRODUCE, "PRODUCE"}, {EApiKey::FETCH, "FETCH"}, + {EApiKey::LIST_OFFSETS, "LIST_OFFSETS"}, {EApiKey::METADATA, "METADATA"}, {EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"}, {EApiKey::API_VERSIONS, "API_VERSIONS"}, @@ -24,6 +25,8 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) { return std::make_unique<TProduceRequestData>(); case FETCH: return std::make_unique<TFetchRequestData>(); + case LIST_OFFSETS: + return std::make_unique<TListOffsetsRequestData>(); case METADATA: return std::make_unique<TMetadataRequestData>(); case SASL_HANDSHAKE: @@ -45,6 +48,8 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) { return std::make_unique<TProduceResponseData>(); case FETCH: return std::make_unique<TFetchResponseData>(); + case LIST_OFFSETS: + return std::make_unique<TListOffsetsResponseData>(); case METADATA: return std::make_unique<TMetadataResponseData>(); case SASL_HANDSHAKE: @@ -74,6 +79,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case LIST_OFFSETS: + if (_version >= 6) { + return 2; + } else { + return 1; + } case METADATA: if (_version >= 9) { return 2; @@ -120,6 +131,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 0; } + case LIST_OFFSETS: + if (_version >= 6) { + return 1; + } else { + return 0; + } case METADATA: if (_version >= 9) { return 1; @@ -1472,6 +1489,379 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransac // +// TListOffsetsRequestData +// +const TListOffsetsRequestData::ReplicaIdMeta::Type TListOffsetsRequestData::ReplicaIdMeta::Default = 0; +const TListOffsetsRequestData::IsolationLevelMeta::Type TListOffsetsRequestData::IsolationLevelMeta::Default = 0; + +TListOffsetsRequestData::TListOffsetsRequestData() + : ReplicaId(ReplicaIdMeta::Default) + , IsolationLevel(IsolationLevelMeta::Default) +{} + +void TListOffsetsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsRequestData"; + } + NPrivate::Read<ReplicaIdMeta>(_readable, _version, ReplicaId); + NPrivate::Read<IsolationLevelMeta>(_readable, _version, IsolationLevel); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ReplicaIdMeta>(_collector, _writable, _version, ReplicaId); + NPrivate::Write<IsolationLevelMeta>(_collector, _writable, _version, IsolationLevel); + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ReplicaIdMeta>(_collector, _version, ReplicaId); + NPrivate::Size<IsolationLevelMeta>(_collector, _version, IsolationLevel); + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TListOffsetsRequestData::TListOffsetsTopic +// +const TListOffsetsRequestData::TListOffsetsTopic::NameMeta::Type TListOffsetsRequestData::TListOffsetsTopic::NameMeta::Default = {""}; + +TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsTopic() + : Name(NameMeta::Default) +{} + +void TListOffsetsRequestData::TListOffsetsTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsRequestData::TListOffsetsTopic"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsRequestData::TListOffsetsTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsRequestData::TListOffsetsTopic"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsRequestData::TListOffsetsTopic::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition +// +const TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::PartitionIndexMeta::Type TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::PartitionIndexMeta::Default = 0; +const TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::CurrentLeaderEpochMeta::Type TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::CurrentLeaderEpochMeta::Default = -1; +const TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::TimestampMeta::Type TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::TimestampMeta::Default = 0; +const TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::MaxNumOffsetsMeta::Type TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::MaxNumOffsetsMeta::Default = 1; + +TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::TListOffsetsPartition() + : PartitionIndex(PartitionIndexMeta::Default) + , CurrentLeaderEpoch(CurrentLeaderEpochMeta::Default) + , Timestamp(TimestampMeta::Default) + , MaxNumOffsets(MaxNumOffsetsMeta::Default) +{} + +void TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition"; + } + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<CurrentLeaderEpochMeta>(_readable, _version, CurrentLeaderEpoch); + NPrivate::Read<TimestampMeta>(_readable, _version, Timestamp); + NPrivate::Read<MaxNumOffsetsMeta>(_readable, _version, MaxNumOffsets); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex); + NPrivate::Write<CurrentLeaderEpochMeta>(_collector, _writable, _version, CurrentLeaderEpoch); + NPrivate::Write<TimestampMeta>(_collector, _writable, _version, Timestamp); + NPrivate::Write<MaxNumOffsetsMeta>(_collector, _writable, _version, MaxNumOffsets); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex); + NPrivate::Size<CurrentLeaderEpochMeta>(_collector, _version, CurrentLeaderEpoch); + NPrivate::Size<TimestampMeta>(_collector, _version, Timestamp); + NPrivate::Size<MaxNumOffsetsMeta>(_collector, _version, MaxNumOffsets); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TListOffsetsResponseData +// +const TListOffsetsResponseData::ThrottleTimeMsMeta::Type TListOffsetsResponseData::ThrottleTimeMsMeta::Default = 0; + +TListOffsetsResponseData::TListOffsetsResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) +{} + +void TListOffsetsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TListOffsetsResponseData::TListOffsetsTopicResponse +// +const TListOffsetsResponseData::TListOffsetsTopicResponse::NameMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::NameMeta::Default = {""}; + +TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsTopicResponse() + : Name(NameMeta::Default) +{} + +void TListOffsetsResponseData::TListOffsetsTopicResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsResponseData::TListOffsetsTopicResponse"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsResponseData::TListOffsetsTopicResponse::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsResponseData::TListOffsetsTopicResponse"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsResponseData::TListOffsetsTopicResponse::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse +// +const TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::PartitionIndexMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::PartitionIndexMeta::Default = 0; +const TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::ErrorCodeMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::ErrorCodeMeta::Default = 0; +const TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::TimestampMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::TimestampMeta::Default = -1; +const TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::OffsetMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::OffsetMeta::Default = -1; +const TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::LeaderEpochMeta::Type TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::LeaderEpochMeta::Default = -1; + +TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::TListOffsetsPartitionResponse() + : PartitionIndex(PartitionIndexMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , Timestamp(TimestampMeta::Default) + , Offset(OffsetMeta::Default) + , LeaderEpoch(LeaderEpochMeta::Default) +{} + +void TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse"; + } + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<OldStyleOffsetsMeta>(_readable, _version, OldStyleOffsets); + NPrivate::Read<TimestampMeta>(_readable, _version, Timestamp); + NPrivate::Read<OffsetMeta>(_readable, _version, Offset); + NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<OldStyleOffsetsMeta>(_collector, _writable, _version, OldStyleOffsets); + NPrivate::Write<TimestampMeta>(_collector, _writable, _version, Timestamp); + NPrivate::Write<OffsetMeta>(_collector, _writable, _version, Offset); + NPrivate::Write<LeaderEpochMeta>(_collector, _writable, _version, LeaderEpoch); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionResponse::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<OldStyleOffsetsMeta>(_collector, _version, OldStyleOffsets); + NPrivate::Size<TimestampMeta>(_collector, _version, Timestamp); + NPrivate::Size<OffsetMeta>(_collector, _version, Offset); + NPrivate::Size<LeaderEpochMeta>(_collector, _version, LeaderEpoch); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// // TMetadataRequestData // const TMetadataRequestData::AllowAutoTopicCreationMeta::Type TMetadataRequestData::AllowAutoTopicCreationMeta::Default = true; diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index 6c795e4bf75..0b6552f9a10 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -18,6 +18,7 @@ enum EApiKey { HEADER = -1, // [] PRODUCE = 0, // [ZK_BROKER, BROKER] FETCH = 1, // [ZK_BROKER, BROKER, CONTROLLER] + LIST_OFFSETS = 2, // [ZK_BROKER, BROKER] METADATA = 3, // [ZK_BROKER, BROKER] SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER] API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER] @@ -1530,6 +1531,406 @@ public: }; +class TListOffsetsRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TListOffsetsRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsRequestData(); + ~TListOffsetsRequestData() = default; + + class TListOffsetsTopic : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsTopic(); + ~TListOffsetsTopic() = default; + + class TListOffsetsPartition : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsPartition(); + ~TListOffsetsPartition() = default; + + struct PartitionIndexMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "partitionIndex"; + static constexpr const char* About = "The partition index."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + PartitionIndexMeta::Type PartitionIndex; + + struct CurrentLeaderEpochMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "currentLeaderEpoch"; + static constexpr const char* About = "The current leader epoch."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + CurrentLeaderEpochMeta::Type CurrentLeaderEpoch; + + struct TimestampMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "timestamp"; + static constexpr const char* About = "The current timestamp."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + TimestampMeta::Type Timestamp; + + struct MaxNumOffsetsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "maxNumOffsets"; + static constexpr const char* About = "The maximum number of offsets to report."; + static const Type Default; // = 1; + + static constexpr TKafkaVersions PresentVersions = {0, 0}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MaxNumOffsetsMeta::Type MaxNumOffsets; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsPartition& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct PartitionsMeta { + using ItemType = TListOffsetsPartition; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TListOffsetsPartition>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = "Each partition in the request."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsTopic& other) const = default; + }; + + struct ReplicaIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "replicaId"; + static constexpr const char* About = "The broker ID of the requestor, or -1 if this request is being made by a normal consumer."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ReplicaIdMeta::Type ReplicaId; + + struct IsolationLevelMeta { + using Type = TKafkaInt8; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "isolationLevel"; + static constexpr const char* About = "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records"; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {2, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + IsolationLevelMeta::Type IsolationLevel; + + struct TopicsMeta { + using ItemType = TListOffsetsTopic; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TListOffsetsTopic>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "Each topic in the request."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + TopicsMeta::Type Topics; + + i16 ApiKey() const override { return LIST_OFFSETS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsRequestData& other) const = default; +}; + + +class TListOffsetsResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TListOffsetsResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsResponseData(); + ~TListOffsetsResponseData() = default; + + class TListOffsetsTopicResponse : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsTopicResponse(); + ~TListOffsetsTopicResponse() = default; + + class TListOffsetsPartitionResponse : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TListOffsetsPartitionResponse(); + ~TListOffsetsPartitionResponse() = default; + + struct PartitionIndexMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "partitionIndex"; + static constexpr const char* About = "The partition index."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + PartitionIndexMeta::Type PartitionIndex; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The partition error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct OldStyleOffsetsMeta { + using ItemType = TKafkaInt64; + using ItemTypeDesc = NPrivate::TKafkaIntDesc; + using Type = std::vector<TKafkaInt64>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "oldStyleOffsets"; + static constexpr const char* About = "The result offsets."; + + static constexpr TKafkaVersions PresentVersions = {0, 0}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + OldStyleOffsetsMeta::Type OldStyleOffsets; + + struct TimestampMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "timestamp"; + static constexpr const char* About = "The timestamp associated with the returned offset."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + TimestampMeta::Type Timestamp; + + struct OffsetMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "offset"; + static constexpr const char* About = "The returned offset."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + OffsetMeta::Type Offset; + + struct LeaderEpochMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "leaderEpoch"; + static constexpr const char* About = ""; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + LeaderEpochMeta::Type LeaderEpoch; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsPartitionResponse& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name"; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct PartitionsMeta { + using ItemType = TListOffsetsPartitionResponse; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TListOffsetsPartitionResponse>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = "Each partition in the response."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsTopicResponse& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {2, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct TopicsMeta { + using ItemType = TListOffsetsTopicResponse; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TListOffsetsTopicResponse>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "Each topic in the response."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + TopicsMeta::Type Topics; + + i16 ApiKey() const override { return LIST_OFFSETS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TListOffsetsResponseData& other) const = default; +}; + + class TMetadataRequestData : public TApiMessage { public: typedef std::shared_ptr<TMetadataRequestData> TPtr; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 43938b9439b..a60126e69c0 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -365,6 +365,26 @@ public: return WriteAndRead<TProduceResponseData>(header, request); } + TListOffsetsResponseData::TPtr ListOffsets(ui64 partitionsCount, const TString& topic) { + Cerr << ">>>>> TListOffsetsResponseData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4); + + TListOffsetsRequestData request; + request.IsolationLevel = 0; + request.ReplicaId = 0; + NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{}; + newTopic.Name = topic; + for(ui64 i = 0; i < partitionsCount; i++) { + NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{}; + newPartition.PartitionIndex = i; + newPartition.Timestamp = -2; + newTopic.Partitions.emplace_back(newPartition); + } + request.Topics.emplace_back(newTopic); + return WriteAndRead<TListOffsetsResponseData>(header, request); + } + void UnknownApiKey() { Cerr << ">>>>> Unknown apiKey\n"; @@ -419,6 +439,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TInsecureTestServer testServer("2"); TString topicName = "/Root/topic-0-test"; + ui64 minActivePartitions = 10; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); { @@ -426,7 +447,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { pqClient .CreateTopic(topicName, NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100) + .PartitioningSettings(minActivePartitions, 100) .BeginAddConsumer("consumer-0").EndAddConsumer()) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); @@ -462,6 +483,15 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { + auto msg = client.ListOffsets(minActivePartitions, topicName); + for (auto& topic: msg->Topics) { + for (auto& partition: topic.Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + } + } + + { auto msg = client.InitProducerId(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 0be9f1bc89a..58175e06add 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -8,6 +8,8 @@ SRCS( actors/kafka_sasl_auth_actor.cpp actors/kafka_sasl_handshake_actor.cpp actors/kafka_metrics_actor.cpp + actors/kafka_list_offsets_actor.cpp + actors/kafka_topic_offsets_actor.cpp kafka_connection.cpp kafka_connection.h kafka_listener.h diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a9c916358e1..906ebfe948a 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1,4 +1,4 @@ - #include "schema_actors.h" +#include "schema_actors.h" #include "persqueue_utils.h" @@ -530,6 +530,7 @@ void TDescribeTopicActorImpl::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev auto it = Tablets.find(ev->Get()->TabletId); if (it == Tablets.end()) return; it->second.NodeId = ev->Get()->ServerId.NodeId(); + it->second.Generation = ev->Get()->Generation; } } @@ -1232,9 +1233,12 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: bool TDescribeTopicActorImpl::ProcessTablets( const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx ) { + std::unordered_set<ui32> partitionSet(Settings.Partitions.begin(), Settings.Partitions.end()); auto partitionFilter = [&] (ui32 partId) { if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribePartitions) { return Settings.RequireStats && partId == Settings.Partitions[0]; + } else if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeTopic) { + return Settings.RequireStats && (partitionSet.empty() || partitionSet.find(partId) != partitionSet.end()); } else { return Settings.RequireStats; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index 2ddb53c59e2..22f464da967 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -85,8 +85,10 @@ struct TDescribeTopicActorSettings { , RequireLocation(requireLocation) {} - static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation) { - return TDescribeTopicActorSettings{EMode::DescribeTopic, requireStats, requireLocation}; + static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation, const TVector<ui32>& partitions = {}) { + TDescribeTopicActorSettings res{EMode::DescribeTopic, requireStats, requireLocation}; + res.Partitions = partitions; + return res; } static TDescribeTopicActorSettings DescribeConsumer(const TString& consumer, bool requireStats, bool requireLocation) @@ -114,13 +116,13 @@ class TDescribeTopicActorImpl { protected: struct TTabletInfo { - ui64 TabletId; + ui64 TabletId = 0; std::vector<ui32> Partitions; TActorId Pipe; ui32 NodeId = 0; ui32 RetriesLeft = 3; bool ResultRecived = false; - + ui64 Generation = 0; TTabletInfo() = default; TTabletInfo(ui64 tabletId) : TabletId(tabletId) |