diff options
author | Sergey Veselov <124132947+siarheivesialou@users.noreply.github.com> | 2024-01-10 15:24:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-10 15:24:42 +0300 |
commit | c6ed37f07480b3b223dc280687816f891a421783 (patch) | |
tree | 839e1d4dc700cb873889675182a47e6c74cceb86 | |
parent | 5c509d39f4e9df248bb28328bbb1e0128d6b8bdf (diff) | |
download | ydb-c6ed37f07480b3b223dc280687816f891a421783.tar.gz |
LOGBROKER-8860 Implement CreatePartitions Kafka API endpoint (#866)
-rw-r--r-- | ydb/core/kafka_proxy/actors/actors.h | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp | 482 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h | 41 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp | 42 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h | 6 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 5 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.cpp | 309 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.h | 278 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 172 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ya.make | 1 |
11 files changed, 1321 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 2e1c27ee85..e1c287a865 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -163,5 +163,6 @@ NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, c NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message); NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message); NActors::IActor* CreateKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreateTopicsRequestData>& message); +NActors::IActor* CreateKafkaCreatePartitionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreatePartitionsRequestData>& message); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp new file mode 100644 index 0000000000..6f98326a58 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -0,0 +1,482 @@ +#include "kafka_create_partitions_actor.h" + +#include <ydb/core/kafka_proxy/kafka_events.h> + +#include <ydb/services/lib/actors/pq_schema_actor.h> + +#include <ydb/core/kafka_proxy/kafka_constants.h> + + +namespace NKafka { + +class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCtx { +public: + using TRequest = TKafkaCreatePartitionsRequest; + + TKafkaCreatePartitionsRequest( + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + TString topicPath, + TString databaseName, + const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback) + : UserToken(userToken) + , TopicPath(topicPath) + , DatabaseName(databaseName) + , SendResultCallback(sendResultCallback) + { + }; + + const TString path() const { + return TopicPath; + } + + TMaybe<TString> GetTraceId() const override { + return Nothing(); + } + + const TMaybe<TString> GetDatabaseName() const override { + return DatabaseName; + } + + const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override { + return UserToken; + } + + const TString& GetSerializedToken() const override { + return UserToken->GetSerializedToken(); + } + + bool IsClientLost() const override { + return false; + }; + + virtual const google::protobuf::Message* GetRequest() const override { + return nullptr; + }; + + const TMaybe<TString> GetRequestType() const override { + return Nothing(); + }; + + void SetFinishAction(std::function<void()>&& cb) override { + Y_UNUSED(cb); + }; + + google::protobuf::Arena* GetArena() override { + return nullptr; + }; + + bool HasClientCapability(const TString& capability) const override { + Y_UNUSED(capability); + return false; + }; + + void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { + processYdbStatusCode(status); + }; + + void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override { + Y_UNUSED(code); + Y_UNUSED(msg); + Y_UNUSED(details); + } + + TString GetPeerName() const override { + return ""; + } + + TInstant GetDeadline() const override { + return TInstant(); + } + + const TMaybe<TString> GetPeerMetaValues(const TString&) const override { + return Nothing(); + } + + TVector<TStringBuf> FindClientCert() const override { + return TVector<TStringBuf>(); + } + + TMaybe<NKikimr::NRpcService::TRlPath> GetRlPath() const override { + return Nothing(); + } + + void RaiseIssue(const NYql::TIssue& issue) override{ + ReplyMessage = issue.GetMessage(); + Y_UNUSED(issue); + } + + void RaiseIssues(const NYql::TIssues& issues) override { + Y_UNUSED(issues); + }; + + const TString& GetRequestName() const override { + return DummyString; + }; + + void SetDiskQuotaExceeded(bool disk) override { + Y_UNUSED(disk); + }; + + bool GetDiskQuotaExceeded() const override { + return false; + }; + + void AddAuditLogPart(const TStringBuf& name, const TString& value) override { + Y_UNUSED(name); + Y_UNUSED(value); + }; + + const NKikimr::NGRpcService::TAuditLogParts& GetAuditLogParts() const override { + return DummyAuditLogParts; + }; + + google::protobuf::Message* GetRequestMut() override { + return nullptr; + }; + + void SetRuHeader(ui64 ru) override { + Y_UNUSED(ru); + }; + + void AddServerHint(const TString& hint) override { + Y_UNUSED(hint); + }; + + void SetCostInfo(float consumed_units) override { + Y_UNUSED(consumed_units); + }; + + void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) override { + Y_UNUSED(cb); + }; + + void FinishStream(ui32 status) override { + Y_UNUSED(status); + }; + + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + Y_UNUSED(in); + Y_UNUSED(status); + }; + + void Reply(NProtoBuf::Message* resp, ui32 status = 0) override { + Y_UNUSED(resp); + Y_UNUSED(status); + }; + + void SendOperation(const Ydb::Operations::Operation& operation) override { + Y_UNUSED(operation); + }; + + NWilson::TTraceId GetWilsonTraceId() const override { + return {}; + } + + void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { + Y_UNUSED(result); + processYdbStatusCode(status); + }; + + void SendResult( + const google::protobuf::Message& result, + Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override { + + Y_UNUSED(result); + Y_UNUSED(message); + processYdbStatusCode(status, std::optional(std::ref(message))); + }; + + void SendResult( + Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override { + + Y_UNUSED(message); + processYdbStatusCode(status, std::optional(std::ref(message))); + }; + + const Ydb::Operations::OperationParams& operation_params() const { + return DummyParams; + } + + static TKafkaCreatePartitionsRequest* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) { + return static_cast<TKafkaCreatePartitionsRequest*>(&(*request)); + } + +protected: + void FinishRequest() override { + }; + +private: + const Ydb::Operations::OperationParams DummyParams; + const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + const TString DummyString; + const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts; + const TString TopicPath; + const TString DatabaseName; + const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback; + TString ReplyMessage; + + void processYdbStatusCode( + Ydb::StatusIds::StatusCode& status, + std::optional<std::reference_wrapper<const google::protobuf::RepeatedPtrField< + NKikimr::NGRpcService::TYdbIssueMessageType>>> issueMessagesOpt = std::nullopt) { + + switch (status) { + case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS: + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage); + break; + case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage); + break; + case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR: + if (issueMessagesOpt.has_value()) { + auto& issueMessages = issueMessagesOpt.value().get(); + bool hasPathNotExists = std::find_if( + issueMessages.begin(), + issueMessages.end(), + [](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; } + ) != issueMessages.end(); + + if (hasPathNotExists) { + SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage); + } else { + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); + } + } else { + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); + } + break; + default: + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); + } + } +}; + +class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest> { + using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>; +public: + + TCreatePartitionsActor( + TActorId requester, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + TString topicPath, + TString databaseName, + ui32 partitionsNumber) + : TBase(new TKafkaCreatePartitionsRequest( + userToken, + topicPath, + databaseName, + [this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { + this->SendResult(status, message); + }) + ) + , Requester(requester) + , TopicPath(topicPath) + , PartionsNumber(partitionsNumber) + { + KAFKA_LOG_D( + "Create Topic actor. DatabaseName: " << databaseName << + ". TopicPath: " << TopicPath << + ". PartitionsNumber: " << PartionsNumber); + }; + + ~TCreatePartitionsActor() = default; + + void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { + THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse()); + response->Status = status; + response->TopicPath = TopicPath; + response->Message = message; + Send(Requester, response.Release()); + Send(SelfId(), new TEvents::TEvPoison()); + } + + void FillProposeRequest( + NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal, + const TActorContext &ctx, + const TString &workingDir, + const TString &name) { + + Y_UNUSED(ctx); + + NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); + modifyScheme.SetWorkingDir(workingDir); + modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); + + auto pqDescr = modifyScheme.MutableAlterPersQueueGroup(); + (*pqDescr).SetTotalGroupCount(PartionsNumber); + (*pqDescr).SetName(name); + }; + + void Bootstrap(const NActors::TActorContext& ctx) { + TBase::Bootstrap(ctx); + SendProposeRequest(ctx); + Become(&TCreatePartitionsActor::StateWork); + }; + + void StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle); + default: + TBase::StateWork(ev); + } + } + + void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } + +private: + const TActorId Requester; + const TString TopicPath; + const std::shared_ptr<TString> SerializedToken; + const ui32 PartionsNumber; +}; + +NActors::IActor* CreateKafkaCreatePartitionsActor( + const TContext::TPtr context, + const ui64 correlationId, + const TMessagePtr<TCreatePartitionsRequestData>& message +) { + return new TKafkaCreatePartitionsActor(context, correlationId, message); +} + +void TKafkaCreatePartitionsActor::Bootstrap(const NActors::TActorContext& ctx) { + KAFKA_LOG_D(InputLogMessage()); + + if (Message->ValidateOnly) { + ProcessValidateOnly(ctx); + return; + } + + std::unordered_set<TString> topicNames; + for (auto& topic : Message->Topics) { + auto& topicName = topic.Name.value(); + if (topicNames.contains(topicName)) { + DuplicateTopicNames.insert(topicName); + } else { + topicNames.insert(topicName); + } + } + + for (auto& topic : Message->Topics) { + auto& topicName = topic.Name.value(); + + if (DuplicateTopicNames.contains(topicName)) { + continue; + } + + if (topicName == "") { + auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>(); + result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST; + result->Message = "Empty topic name"; + this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release()); + continue; + } + + ctx.Register(new TCreatePartitionsActor( + SelfId(), + Context->UserToken, + topic.Name.value(), + Context->DatabasePath, + topic.Count + )); + + InflyTopics++; + } + + if (InflyTopics > 0) { + Become(&TKafkaCreatePartitionsActor::StateWork); + } else { + Reply(ctx); + } +}; + +void TKafkaCreatePartitionsActor::Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx) { + auto eventPtr = ev->Release(); + TopicNamesToResponses[eventPtr->TopicPath] = eventPtr; + InflyTopics--; + if (InflyTopics == 0) { + Reply(ctx); + } +}; + +void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) { + TCreatePartitionsResponseData::TPtr response = std::make_shared<TCreatePartitionsResponseData>(); + EKafkaErrors responseStatus = NONE_ERROR; + + for (auto& requestTopic : Message->Topics) { + auto topicName = requestTopic.Name.value(); + + TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic; + responseTopic.Name = topicName; + + if (TopicNamesToResponses.contains(topicName)) { + responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message; + } + + auto setError= [&responseTopic, &responseStatus](EKafkaErrors status) { + responseTopic.ErrorCode = status; + responseStatus = status; + }; + + if (DuplicateTopicNames.contains(topicName)) { + setError(DUPLICATE_RESOURCE); + } else { + switch (TopicNamesToResponses[topicName]->Status) { + case TEvKafka::TEvTopicModificationResponse::OK: + responseTopic.ErrorCode = NONE_ERROR; + break; + case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST: + case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST: + setError(INVALID_REQUEST); + break; + case TEvKafka::TEvTopicModificationResponse::ERROR: + setError(UNKNOWN_SERVER_ERROR); + break; + case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG: + setError(INVALID_CONFIG); + break; + } + } + response->Results.push_back(responseTopic); + } + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); + + Die(ctx); +}; + +TStringBuilder TKafkaCreatePartitionsActor::InputLogMessage() { + TStringBuilder stringBuilder; + stringBuilder << "Create partitions actor: New request. ValidateOnly:" << (Message->ValidateOnly != 0) << " Topics: ["; + + bool isFirst = true; + for (auto& requestTopic : Message->Topics) { + if (isFirst) { + isFirst = false; + } else { + stringBuilder << ","; + } + stringBuilder << " " << requestTopic.Name.value(); + } + stringBuilder << " ]"; + return stringBuilder; +}; + + +void TKafkaCreatePartitionsActor::ProcessValidateOnly(const NActors::TActorContext& ctx) { + TCreatePartitionsResponseData::TPtr response = std::make_shared<TCreatePartitionsResponseData>(); + + for (auto& requestTopic : Message->Topics) { + auto topicName = requestTopic.Name.value(); + + TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic; + responseTopic.Name = topicName; + responseTopic.ErrorCode = NONE_ERROR; + response->Results.push_back(responseTopic); + } + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, NONE_ERROR)); + Die(ctx); +}; +} diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h new file mode 100644 index 0000000000..60ea85e078 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h @@ -0,0 +1,41 @@ +#include "actors.h" +#include <ydb/core/kafka_proxy/kafka_events.h> + +#include <ydb/library/actors/core/actor_bootstrapped.h> + +namespace NKafka { + +class TKafkaCreatePartitionsActor: public NActors::TActorBootstrapped<TKafkaCreatePartitionsActor> { +public: + TKafkaCreatePartitionsActor( + const TContext::TPtr context, + const ui64 correlationId, + const TMessagePtr<TCreatePartitionsRequestData>& message) + : Context(context) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + void Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx); + void Reply(const TActorContext& ctx); + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvTopicModificationResponse, Handle); + } + } + +private: + const TContext::TPtr Context; + const ui64 CorrelationId; + const TMessagePtr<TCreatePartitionsRequestData> Message; + std::unordered_set<TString> DuplicateTopicNames; + ui32 InflyTopics = 0; + std::unordered_map<TString, TAutoPtr<TEvKafka::TEvTopicModificationResponse>> TopicNamesToResponses; + + TStringBuilder InputLogMessage(); + void ProcessValidateOnly(const NActors::TActorContext& ctx); +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index 5702f844a2..ecb3ddd1b3 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -17,7 +17,7 @@ public: TIntrusiveConstPtr<NACLib::TUserToken> userToken, TString topicPath, TString databaseName, - const std::function<void(TEvKafka::TEvCreateTopicsResponse::EStatus, TString&)> sendResultCallback) + const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback) : UserToken(userToken) , TopicPath(topicPath) , DatabaseName(databaseName) @@ -211,19 +211,19 @@ private: const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts; const TString TopicPath; const TString DatabaseName; - const std::function<void(TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message)> SendResultCallback; + const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback; TString ReplyMessage; void processYdbStatusCode(Ydb::StatusIds::StatusCode& status) { switch (status) { case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS: - SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::OK, ReplyMessage); + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage); break; case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: - SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST, ReplyMessage); + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage); break; default: - SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::ERROR, ReplyMessage); + SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); } } }; @@ -244,7 +244,7 @@ public: userToken, topicPath, databaseName, - [this](TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) { + [this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { this->SendResult(status, message); }) ) @@ -259,8 +259,8 @@ public: ~TCreateTopicActor() = default; - void SendResult(TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) { - THolder<TEvKafka::TEvCreateTopicsResponse> response(new TEvKafka::TEvCreateTopicsResponse()); + void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { + THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse()); response->Status = status; response->TopicPath = TopicPath; response->Message = message; @@ -377,10 +377,10 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { } if (topicName == "") { - auto result = MakeHolder<TEvKafka::TEvCreateTopicsResponse>(); - result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST; + auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>(); + result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST; result->Message = "Empty topic name"; - this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvCreateTopicsResponse>(result.Release()); + this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release()); continue; } @@ -394,10 +394,10 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { retention = std::stoul(config.Value.value()); return true; } catch(std::invalid_argument) { - auto result = MakeHolder<TEvKafka::TEvCreateTopicsResponse>(); - result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::INVALID_CONFIG; + auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>(); + result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::INVALID_CONFIG; result->Message = "Provided retention value is not a number"; - this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr<TEvKafka::TEvCreateTopicsResponse>(result.Release()); + this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release()); return false; } }; @@ -446,7 +446,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { } }; -void TKafkaCreateTopicsActor::Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx) { +void TKafkaCreateTopicsActor::Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx) { auto eventPtr = ev->Release(); KAFKA_LOG_D(TStringBuilder() << "Create topics actor. Topic's " << eventPtr->TopicPath << " response received." << std::to_string(eventPtr->Status)); TopicNamesToResponses[eventPtr->TopicPath] = eventPtr; @@ -490,18 +490,22 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) { setError(DUPLICATE_RESOURCE); } else { switch (TopicNamesToResponses[topicName]->Status) { - case TEvKafka::TEvCreateTopicsResponse::OK: + case TEvKafka::TEvTopicModificationResponse::OK: responseTopic.ErrorCode = NONE_ERROR; addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME); addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME); break; - case TEvKafka::TEvCreateTopicsResponse::BAD_REQUEST: + case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST: setError(INVALID_REQUEST); break; - case TEvKafka::TEvCreateTopicsResponse::ERROR: + case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST: + KAFKA_LOG_ERROR("Create topics actor: Topic: [" << topicName << "]. Unexpected TOPIC_DOES_NOT_EXIST status received."); setError(UNKNOWN_SERVER_ERROR); break; - case TEvKafka::TEvCreateTopicsResponse::INVALID_CONFIG: + case TEvKafka::TEvTopicModificationResponse::ERROR: + setError(UNKNOWN_SERVER_ERROR); + break; + case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG: setError(INVALID_CONFIG); break; } diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h index afa1657311..fad6dd15a2 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h @@ -14,12 +14,12 @@ public: } void Bootstrap(const NActors::TActorContext& ctx); - void Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx); + void Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx); void Reply(const TActorContext& ctx); STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(TEvKafka::TEvCreateTopicsResponse, Handle); + HFunc(TEvKafka::TEvTopicModificationResponse, Handle); } } @@ -29,7 +29,7 @@ private: const TMessagePtr<TCreateTopicsRequestData> Message; std::unordered_set<TString> DuplicateTopicNames; ui32 InflyTopics = 0; - std::unordered_map<TString, TAutoPtr<TEvKafka::TEvCreateTopicsResponse>> TopicNamesToResponses; + std::unordered_map<TString, TAutoPtr<TEvKafka::TEvTopicModificationResponse>> TopicNamesToResponses; std::unordered_map<TString, std::pair<std::optional<ui64>, std::optional<ui64>>> TopicNamesToRetentions; TStringBuilder InputLogMessage(); diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 57e728f548..de4ca12a8d 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -306,6 +306,10 @@ protected: Register(CreateKafkaCreateTopicsActor(Context, header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TCreatePartitionsRequestData>& message) { + Register(CreateKafkaCreatePartitionsActor(Context, header->CorrelationId, message)); + } + template<class T> TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) { return TMessagePtr<T>(request->Buffer, request->Message); @@ -390,6 +394,10 @@ protected: HandleMessage(&Request->Header, Cast<TCreateTopicsRequestData>(Request)); break; + case CREATE_PARTITIONS: + HandleMessage(&Request->Header, Cast<TCreatePartitionsRequestData>(Request)); + break; + default: KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); PassAway(); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index e2132363a3..1bec0c622a 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -228,7 +228,7 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets; }; -struct TEvCreateTopicsResponse : public NActors::TEventLocal<TEvCreateTopicsResponse, EvCreateTopicsResponse> +struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse> , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase { enum EStatus { @@ -236,9 +236,10 @@ struct TEvCreateTopicsResponse : public NActors::TEventLocal<TEvCreateTopicsResp ERROR, BAD_REQUEST, INVALID_CONFIG, + TOPIC_DOES_NOT_EXIST, }; - TEvCreateTopicsResponse() + TEvTopicModificationResponse() {} TString TopicPath; diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 01b9c8c00c..8b08d98ce4 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -24,6 +24,7 @@ const std::unordered_map<EApiKey, TString> EApiKeyNames = { {EApiKey::CREATE_TOPICS, "CREATE_TOPICS"}, {EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"}, {EApiKey::SASL_AUTHENTICATE, "SASL_AUTHENTICATE"}, + {EApiKey::CREATE_PARTITIONS, "CREATE_PARTITIONS"}, }; @@ -61,6 +62,8 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) { return std::make_unique<TInitProducerIdRequestData>(); case SASL_AUTHENTICATE: return std::make_unique<TSaslAuthenticateRequestData>(); + case CREATE_PARTITIONS: + return std::make_unique<TCreatePartitionsRequestData>(); default: ythrow yexception() << "Unsupported request API key " << apiKey; } @@ -100,6 +103,8 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) { return std::make_unique<TInitProducerIdResponseData>(); case SASL_AUTHENTICATE: return std::make_unique<TSaslAuthenticateResponseData>(); + case CREATE_PARTITIONS: + return std::make_unique<TCreatePartitionsResponseData>(); default: ythrow yexception() << "Unsupported response API key " << apiKey; } @@ -199,6 +204,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case CREATE_PARTITIONS: + if (_version >= 2) { + return 2; + } else { + return 1; + } default: ythrow yexception() << "Unsupported API key " << apiKey; break; @@ -297,6 +308,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 0; } + case CREATE_PARTITIONS: + if (_version >= 2) { + return 1; + } else { + return 0; + } default: ythrow yexception() << "Unsupported API key " << apiKey; break; @@ -5667,4 +5684,296 @@ i32 TSaslAuthenticateResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } + + +// +// TCreatePartitionsRequestData +// +const TCreatePartitionsRequestData::TimeoutMsMeta::Type TCreatePartitionsRequestData::TimeoutMsMeta::Default = 0; +const TCreatePartitionsRequestData::ValidateOnlyMeta::Type TCreatePartitionsRequestData::ValidateOnlyMeta::Default = false; + +TCreatePartitionsRequestData::TCreatePartitionsRequestData() + : TimeoutMs(TimeoutMsMeta::Default) + , ValidateOnly(ValidateOnlyMeta::Default) +{} + +void TCreatePartitionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData"; + } + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + NPrivate::Read<TimeoutMsMeta>(_readable, _version, TimeoutMs); + NPrivate::Read<ValidateOnlyMeta>(_readable, _version, ValidateOnly); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreatePartitionsRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + NPrivate::Write<TimeoutMsMeta>(_collector, _writable, _version, TimeoutMs); + NPrivate::Write<ValidateOnlyMeta>(_collector, _writable, _version, ValidateOnly); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreatePartitionsRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + NPrivate::Size<TimeoutMsMeta>(_collector, _version, TimeoutMs); + NPrivate::Size<ValidateOnlyMeta>(_collector, _version, ValidateOnly); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreatePartitionsRequestData::TCreatePartitionsTopic +// +const TCreatePartitionsRequestData::TCreatePartitionsTopic::NameMeta::Type TCreatePartitionsRequestData::TCreatePartitionsTopic::NameMeta::Default = {""}; +const TCreatePartitionsRequestData::TCreatePartitionsTopic::CountMeta::Type TCreatePartitionsRequestData::TCreatePartitionsTopic::CountMeta::Default = 0; + +TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsTopic() + : Name(NameMeta::Default) + , Count(CountMeta::Default) +{} + +void TCreatePartitionsRequestData::TCreatePartitionsTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<CountMeta>(_readable, _version, Count); + NPrivate::Read<AssignmentsMeta>(_readable, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreatePartitionsRequestData::TCreatePartitionsTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<CountMeta>(_collector, _writable, _version, Count); + NPrivate::Write<AssignmentsMeta>(_collector, _writable, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreatePartitionsRequestData::TCreatePartitionsTopic::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<CountMeta>(_collector, _version, Count); + NPrivate::Size<AssignmentsMeta>(_collector, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment +// + +TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::TCreatePartitionsAssignment() +{} + +void TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment"; + } + NPrivate::Read<BrokerIdsMeta>(_readable, _version, BrokerIds); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<BrokerIdsMeta>(_collector, _writable, _version, BrokerIds); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<BrokerIdsMeta>(_collector, _version, BrokerIds); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreatePartitionsResponseData +// +const TCreatePartitionsResponseData::ThrottleTimeMsMeta::Type TCreatePartitionsResponseData::ThrottleTimeMsMeta::Default = 0; + +TCreatePartitionsResponseData::TCreatePartitionsResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) +{} + +void TCreatePartitionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ResultsMeta>(_readable, _version, Results); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreatePartitionsResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ResultsMeta>(_collector, _writable, _version, Results); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreatePartitionsResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ResultsMeta>(_collector, _version, Results); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreatePartitionsResponseData::TCreatePartitionsTopicResult +// +const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::NameMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::NameMeta::Default = {""}; +const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorCodeMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorCodeMeta::Default = 0; +const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorMessageMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorMessageMeta::Default = std::nullopt; + +TCreatePartitionsResponseData::TCreatePartitionsTopicResult::TCreatePartitionsTopicResult() + : Name(NameMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , ErrorMessage(ErrorMessageMeta::Default) +{} + +void TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsResponseData::TCreatePartitionsTopicResult"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsResponseData::TCreatePartitionsTopicResult"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} } //namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index bbe56aabea..1b547a4c44 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -32,6 +32,7 @@ enum EApiKey { CREATE_TOPICS = 19, // [ZK_BROKER, BROKER, CONTROLLER] INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER] SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER] + CREATE_PARTITIONS = 37, // [ZK_BROKER, BROKER, CONTROLLER] }; extern const std::unordered_map<EApiKey, TString> EApiKeyNames; @@ -6181,4 +6182,281 @@ public: bool operator==(const TSaslAuthenticateResponseData& other) const = default; }; + +class TCreatePartitionsRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TCreatePartitionsRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TCreatePartitionsRequestData(); + ~TCreatePartitionsRequestData() = default; + + class TCreatePartitionsTopic : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TCreatePartitionsTopic(); + ~TCreatePartitionsTopic() = default; + + class TCreatePartitionsAssignment : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TCreatePartitionsAssignment(); + ~TCreatePartitionsAssignment() = default; + + struct BrokerIdsMeta { + using ItemType = TKafkaInt32; + using ItemTypeDesc = NPrivate::TKafkaIntDesc; + using Type = std::vector<TKafkaInt32>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "brokerIds"; + static constexpr const char* About = "The assigned broker IDs."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + BrokerIdsMeta::Type BrokerIds; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatePartitionsAssignment& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct CountMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "count"; + static constexpr const char* About = "The new partition count."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + CountMeta::Type Count; + + struct AssignmentsMeta { + using ItemType = TCreatePartitionsAssignment; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TCreatePartitionsAssignment>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "assignments"; + static constexpr const char* About = "The new partition assignments."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + AssignmentsMeta::Type Assignments; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatePartitionsTopic& other) const = default; + }; + + struct TopicsMeta { + using ItemType = TCreatePartitionsTopic; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TCreatePartitionsTopic>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "Each topic that we want to create new partitions inside."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + TopicsMeta::Type Topics; + + struct TimeoutMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "timeoutMs"; + static constexpr const char* About = "The time in ms to wait for the partitions to be created."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + TimeoutMsMeta::Type TimeoutMs; + + struct ValidateOnlyMeta { + using Type = TKafkaBool; + using TypeDesc = NPrivate::TKafkaBoolDesc; + + static constexpr const char* Name = "validateOnly"; + static constexpr const char* About = "If true, then validate the request, but don't actually increase the number of partitions."; + static const Type Default; // = false; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ValidateOnlyMeta::Type ValidateOnly; + + i16 ApiKey() const override { return CREATE_PARTITIONS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatePartitionsRequestData& other) const = default; +}; + + +class TCreatePartitionsResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TCreatePartitionsResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TCreatePartitionsResponseData(); + ~TCreatePartitionsResponseData() = default; + + class TCreatePartitionsTopicResult : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TCreatePartitionsTopicResult(); + ~TCreatePartitionsTopicResult() = default; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The result error, or zero if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ErrorMessageMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "errorMessage"; + static constexpr const char* About = "The result message, or null if there was no error."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ErrorMessageMeta::Type ErrorMessage; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatePartitionsTopicResult& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ResultsMeta { + using ItemType = TCreatePartitionsTopicResult; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TCreatePartitionsTopicResult>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "results"; + static constexpr const char* About = "The partition creation results for each topic."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ResultsMeta::Type Results; + + i16 ApiKey() const override { return CREATE_PARTITIONS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatePartitionsResponseData& other) const = default; +}; + } // namespace NKafka diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index d590686576..12e906c395 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -634,6 +634,25 @@ public: return WriteAndRead<TCreateTopicsResponseData>(header, request); } + TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TopicToCreate> topicsToCreate, bool validateOnly = false) { + Cerr << ">>>>> TCreateTopicsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3); + TCreatePartitionsRequestData request; + request.ValidateOnly = validateOnly; + request.TimeoutMs = 100; + + for (auto& topicToCreate : topicsToCreate) { + NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic; + topic.Name = topicToCreate.Name; + topic.Count = topicToCreate.PartitionsNumber; + + request.Topics.push_back(topic); + } + + return WriteAndRead<TCreatePartitionsResponseData>(header, request); + } + void UnknownApiKey() { Cerr << ">>>>> Unknown apiKey\n"; @@ -1730,6 +1749,159 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } // Y_UNIT_TEST(CreateTopicsScenario) + Y_UNIT_TEST(CreatePartitionsScenario) { + + TInsecureTestServer testServer("2"); + + TString topic1Name = "/Root/topic-1-test"; + TString shortTopic1Name = "topic-1-test"; + + TString topic2Name = "/Root/topic-2-test"; + TString shortTopic2Name = "topic-2-test"; + + TString key = "record-key"; + TString value = "record-value"; + TString headerKey = "header-key"; + TString headerValue = "header-value"; + + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + { + auto result = + pqClient + .CreateTopic(topic1Name, + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(10, 100)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + auto result = + pqClient + .CreateTopic(topic2Name, + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(20, 100)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + TTestClient client(testServer.Port); + + { + auto msg = client.ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 15u); + } + + { + auto msg = client.SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); + + { + // Validate only + auto msg = client.CreatePartitions({ + TopicToCreate(topic1Name, 11), + TopicToCreate(topic2Name, 21) + }, true); + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topic1Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), topic2Name); + + auto result0 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result0.IsSuccess()); + UNIT_ASSERT_EQUAL(result0.GetTopicDescription().GetPartitions().size(), 10); + + auto result1 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result1.IsSuccess()); + UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 20); + } + + { + // Increase partitions number + auto msg = client.CreatePartitions({ + TopicToCreate(shortTopic1Name, 11), + TopicToCreate(shortTopic2Name, 21) + }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), shortTopic2Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].ErrorCode, NONE_ERROR); + + auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result1.IsSuccess()); + UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11); + + auto result2 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result2.IsSuccess()); + UNIT_ASSERT_EQUAL(result2.GetTopicDescription().GetPartitions().size(), 21); + } + + { + // Check with two same topic names + auto msg = client.CreatePartitions({ + TopicToCreate(shortTopic1Name, 11), + TopicToCreate(shortTopic1Name, 11) + }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, DUPLICATE_RESOURCE); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), shortTopic1Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].ErrorCode, DUPLICATE_RESOURCE); + + auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result1.IsSuccess()); + UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11); + + auto result2 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result2.IsSuccess()); + UNIT_ASSERT_EQUAL(result2.GetTopicDescription().GetPartitions().size(), 21); + } + + { + // Check with lesser partitions number + auto msg = client.CreatePartitions({ TopicToCreate(shortTopic1Name, 1) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST); + + auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result1.IsSuccess()); + UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11); + } + + { + // Check with nonexistent topic name + auto topicName = "NonExTopicName"; + auto msg = client.CreatePartitions({ TopicToCreate(topicName, 1) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST); + + auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result1.IsSuccess()); + UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11); + } + } // Y_UNIT_TEST(CreatePartitionsScenario) + Y_UNIT_TEST(LoginWithApiKey) { TInsecureTestServer testServer; diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index a74745d4d8..5567492196 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -16,6 +16,7 @@ SRCS( actors/kafka_offset_fetch_actor.cpp actors/kafka_offset_commit_actor.cpp actors/kafka_create_topics_actor.cpp + actors/kafka_create_partitions_actor.cpp kafka_connection.cpp kafka_connection.h kafka_constants.h |