aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Veselov <124132947+siarheivesialou@users.noreply.github.com>2024-01-10 15:24:42 +0300
committerGitHub <noreply@github.com>2024-01-10 15:24:42 +0300
commitc6ed37f07480b3b223dc280687816f891a421783 (patch)
tree839e1d4dc700cb873889675182a47e6c74cceb86
parent5c509d39f4e9df248bb28328bbb1e0128d6b8bdf (diff)
downloadydb-c6ed37f07480b3b223dc280687816f891a421783.tar.gz
LOGBROKER-8860 Implement CreatePartitions Kafka API endpoint (#866)
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp482
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h41
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp42
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h6
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp8
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h5
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp309
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h278
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp172
-rw-r--r--ydb/core/kafka_proxy/ya.make1
11 files changed, 1321 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 2e1c27ee85..e1c287a865 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -163,5 +163,6 @@ NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, c
NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message);
NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message);
NActors::IActor* CreateKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreateTopicsRequestData>& message);
+NActors::IActor* CreateKafkaCreatePartitionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreatePartitionsRequestData>& message);
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
new file mode 100644
index 0000000000..6f98326a58
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
@@ -0,0 +1,482 @@
+#include "kafka_create_partitions_actor.h"
+
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
+#include <ydb/services/lib/actors/pq_schema_actor.h>
+
+#include <ydb/core/kafka_proxy/kafka_constants.h>
+
+
+namespace NKafka {
+
+class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCtx {
+public:
+ using TRequest = TKafkaCreatePartitionsRequest;
+
+ TKafkaCreatePartitionsRequest(
+ TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+ TString topicPath,
+ TString databaseName,
+ const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
+ : UserToken(userToken)
+ , TopicPath(topicPath)
+ , DatabaseName(databaseName)
+ , SendResultCallback(sendResultCallback)
+ {
+ };
+
+ const TString path() const {
+ return TopicPath;
+ }
+
+ TMaybe<TString> GetTraceId() const override {
+ return Nothing();
+ }
+
+ const TMaybe<TString> GetDatabaseName() const override {
+ return DatabaseName;
+ }
+
+ const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override {
+ return UserToken;
+ }
+
+ const TString& GetSerializedToken() const override {
+ return UserToken->GetSerializedToken();
+ }
+
+ bool IsClientLost() const override {
+ return false;
+ };
+
+ virtual const google::protobuf::Message* GetRequest() const override {
+ return nullptr;
+ };
+
+ const TMaybe<TString> GetRequestType() const override {
+ return Nothing();
+ };
+
+ void SetFinishAction(std::function<void()>&& cb) override {
+ Y_UNUSED(cb);
+ };
+
+ google::protobuf::Arena* GetArena() override {
+ return nullptr;
+ };
+
+ bool HasClientCapability(const TString& capability) const override {
+ Y_UNUSED(capability);
+ return false;
+ };
+
+ void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
+ processYdbStatusCode(status);
+ };
+
+ void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
+ Y_UNUSED(code);
+ Y_UNUSED(msg);
+ Y_UNUSED(details);
+ }
+
+ TString GetPeerName() const override {
+ return "";
+ }
+
+ TInstant GetDeadline() const override {
+ return TInstant();
+ }
+
+ const TMaybe<TString> GetPeerMetaValues(const TString&) const override {
+ return Nothing();
+ }
+
+ TVector<TStringBuf> FindClientCert() const override {
+ return TVector<TStringBuf>();
+ }
+
+ TMaybe<NKikimr::NRpcService::TRlPath> GetRlPath() const override {
+ return Nothing();
+ }
+
+ void RaiseIssue(const NYql::TIssue& issue) override{
+ ReplyMessage = issue.GetMessage();
+ Y_UNUSED(issue);
+ }
+
+ void RaiseIssues(const NYql::TIssues& issues) override {
+ Y_UNUSED(issues);
+ };
+
+ const TString& GetRequestName() const override {
+ return DummyString;
+ };
+
+ void SetDiskQuotaExceeded(bool disk) override {
+ Y_UNUSED(disk);
+ };
+
+ bool GetDiskQuotaExceeded() const override {
+ return false;
+ };
+
+ void AddAuditLogPart(const TStringBuf& name, const TString& value) override {
+ Y_UNUSED(name);
+ Y_UNUSED(value);
+ };
+
+ const NKikimr::NGRpcService::TAuditLogParts& GetAuditLogParts() const override {
+ return DummyAuditLogParts;
+ };
+
+ google::protobuf::Message* GetRequestMut() override {
+ return nullptr;
+ };
+
+ void SetRuHeader(ui64 ru) override {
+ Y_UNUSED(ru);
+ };
+
+ void AddServerHint(const TString& hint) override {
+ Y_UNUSED(hint);
+ };
+
+ void SetCostInfo(float consumed_units) override {
+ Y_UNUSED(consumed_units);
+ };
+
+ void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) override {
+ Y_UNUSED(cb);
+ };
+
+ void FinishStream(ui32 status) override {
+ Y_UNUSED(status);
+ };
+
+ void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
+ Y_UNUSED(in);
+ Y_UNUSED(status);
+ };
+
+ void Reply(NProtoBuf::Message* resp, ui32 status = 0) override {
+ Y_UNUSED(resp);
+ Y_UNUSED(status);
+ };
+
+ void SendOperation(const Ydb::Operations::Operation& operation) override {
+ Y_UNUSED(operation);
+ };
+
+ NWilson::TTraceId GetWilsonTraceId() const override {
+ return {};
+ }
+
+ void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
+ Y_UNUSED(result);
+ processYdbStatusCode(status);
+ };
+
+ void SendResult(
+ const google::protobuf::Message& result,
+ Ydb::StatusIds::StatusCode status,
+ const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {
+
+ Y_UNUSED(result);
+ Y_UNUSED(message);
+ processYdbStatusCode(status, std::optional(std::ref(message)));
+ };
+
+ void SendResult(
+ Ydb::StatusIds::StatusCode status,
+ const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {
+
+ Y_UNUSED(message);
+ processYdbStatusCode(status, std::optional(std::ref(message)));
+ };
+
+ const Ydb::Operations::OperationParams& operation_params() const {
+ return DummyParams;
+ }
+
+ static TKafkaCreatePartitionsRequest* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) {
+ return static_cast<TKafkaCreatePartitionsRequest*>(&(*request));
+ }
+
+protected:
+ void FinishRequest() override {
+ };
+
+private:
+ const Ydb::Operations::OperationParams DummyParams;
+ const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ const TString DummyString;
+ const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
+ const TString TopicPath;
+ const TString DatabaseName;
+ const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
+ TString ReplyMessage;
+
+ void processYdbStatusCode(
+ Ydb::StatusIds::StatusCode& status,
+ std::optional<std::reference_wrapper<const google::protobuf::RepeatedPtrField<
+ NKikimr::NGRpcService::TYdbIssueMessageType>>> issueMessagesOpt = std::nullopt) {
+
+ switch (status) {
+ case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
+ break;
+ case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
+ break;
+ case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR:
+ if (issueMessagesOpt.has_value()) {
+ auto& issueMessages = issueMessagesOpt.value().get();
+ bool hasPathNotExists = std::find_if(
+ issueMessages.begin(),
+ issueMessages.end(),
+ [](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; }
+ ) != issueMessages.end();
+
+ if (hasPathNotExists) {
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage);
+ } else {
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
+ }
+ } else {
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
+ }
+ break;
+ default:
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
+ }
+ }
+};
+
+class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest> {
+ using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
+public:
+
+ TCreatePartitionsActor(
+ TActorId requester,
+ TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+ TString topicPath,
+ TString databaseName,
+ ui32 partitionsNumber)
+ : TBase(new TKafkaCreatePartitionsRequest(
+ userToken,
+ topicPath,
+ databaseName,
+ [this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
+ this->SendResult(status, message);
+ })
+ )
+ , Requester(requester)
+ , TopicPath(topicPath)
+ , PartionsNumber(partitionsNumber)
+ {
+ KAFKA_LOG_D(
+ "Create Topic actor. DatabaseName: " << databaseName <<
+ ". TopicPath: " << TopicPath <<
+ ". PartitionsNumber: " << PartionsNumber);
+ };
+
+ ~TCreatePartitionsActor() = default;
+
+ void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
+ THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
+ response->Status = status;
+ response->TopicPath = TopicPath;
+ response->Message = message;
+ Send(Requester, response.Release());
+ Send(SelfId(), new TEvents::TEvPoison());
+ }
+
+ void FillProposeRequest(
+ NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal,
+ const TActorContext &ctx,
+ const TString &workingDir,
+ const TString &name) {
+
+ Y_UNUSED(ctx);
+
+ NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
+ modifyScheme.SetWorkingDir(workingDir);
+ modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
+
+ auto pqDescr = modifyScheme.MutableAlterPersQueueGroup();
+ (*pqDescr).SetTotalGroupCount(PartionsNumber);
+ (*pqDescr).SetName(name);
+ };
+
+ void Bootstrap(const NActors::TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ SendProposeRequest(ctx);
+ Become(&TCreatePartitionsActor::StateWork);
+ };
+
+ void StateWork(TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle);
+ default:
+ TBase::StateWork(ev);
+ }
+ }
+
+ void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
+
+private:
+ const TActorId Requester;
+ const TString TopicPath;
+ const std::shared_ptr<TString> SerializedToken;
+ const ui32 PartionsNumber;
+};
+
+NActors::IActor* CreateKafkaCreatePartitionsActor(
+ const TContext::TPtr context,
+ const ui64 correlationId,
+ const TMessagePtr<TCreatePartitionsRequestData>& message
+) {
+ return new TKafkaCreatePartitionsActor(context, correlationId, message);
+}
+
+void TKafkaCreatePartitionsActor::Bootstrap(const NActors::TActorContext& ctx) {
+ KAFKA_LOG_D(InputLogMessage());
+
+ if (Message->ValidateOnly) {
+ ProcessValidateOnly(ctx);
+ return;
+ }
+
+ std::unordered_set<TString> topicNames;
+ for (auto& topic : Message->Topics) {
+ auto& topicName = topic.Name.value();
+ if (topicNames.contains(topicName)) {
+ DuplicateTopicNames.insert(topicName);
+ } else {
+ topicNames.insert(topicName);
+ }
+ }
+
+ for (auto& topic : Message->Topics) {
+ auto& topicName = topic.Name.value();
+
+ if (DuplicateTopicNames.contains(topicName)) {
+ continue;
+ }
+
+ if (topicName == "") {
+ auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
+ result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST;
+ result->Message = "Empty topic name";
+ this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
+ continue;
+ }
+
+ ctx.Register(new TCreatePartitionsActor(
+ SelfId(),
+ Context->UserToken,
+ topic.Name.value(),
+ Context->DatabasePath,
+ topic.Count
+ ));
+
+ InflyTopics++;
+ }
+
+ if (InflyTopics > 0) {
+ Become(&TKafkaCreatePartitionsActor::StateWork);
+ } else {
+ Reply(ctx);
+ }
+};
+
+void TKafkaCreatePartitionsActor::Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx) {
+ auto eventPtr = ev->Release();
+ TopicNamesToResponses[eventPtr->TopicPath] = eventPtr;
+ InflyTopics--;
+ if (InflyTopics == 0) {
+ Reply(ctx);
+ }
+};
+
+void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
+ TCreatePartitionsResponseData::TPtr response = std::make_shared<TCreatePartitionsResponseData>();
+ EKafkaErrors responseStatus = NONE_ERROR;
+
+ for (auto& requestTopic : Message->Topics) {
+ auto topicName = requestTopic.Name.value();
+
+ TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic;
+ responseTopic.Name = topicName;
+
+ if (TopicNamesToResponses.contains(topicName)) {
+ responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
+ }
+
+ auto setError= [&responseTopic, &responseStatus](EKafkaErrors status) {
+ responseTopic.ErrorCode = status;
+ responseStatus = status;
+ };
+
+ if (DuplicateTopicNames.contains(topicName)) {
+ setError(DUPLICATE_RESOURCE);
+ } else {
+ switch (TopicNamesToResponses[topicName]->Status) {
+ case TEvKafka::TEvTopicModificationResponse::OK:
+ responseTopic.ErrorCode = NONE_ERROR;
+ break;
+ case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST:
+ case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST:
+ setError(INVALID_REQUEST);
+ break;
+ case TEvKafka::TEvTopicModificationResponse::ERROR:
+ setError(UNKNOWN_SERVER_ERROR);
+ break;
+ case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG:
+ setError(INVALID_CONFIG);
+ break;
+ }
+ }
+ response->Results.push_back(responseTopic);
+ }
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
+
+ Die(ctx);
+};
+
+TStringBuilder TKafkaCreatePartitionsActor::InputLogMessage() {
+ TStringBuilder stringBuilder;
+ stringBuilder << "Create partitions actor: New request. ValidateOnly:" << (Message->ValidateOnly != 0) << " Topics: [";
+
+ bool isFirst = true;
+ for (auto& requestTopic : Message->Topics) {
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ stringBuilder << ",";
+ }
+ stringBuilder << " " << requestTopic.Name.value();
+ }
+ stringBuilder << " ]";
+ return stringBuilder;
+};
+
+
+void TKafkaCreatePartitionsActor::ProcessValidateOnly(const NActors::TActorContext& ctx) {
+ TCreatePartitionsResponseData::TPtr response = std::make_shared<TCreatePartitionsResponseData>();
+
+ for (auto& requestTopic : Message->Topics) {
+ auto topicName = requestTopic.Name.value();
+
+ TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic;
+ responseTopic.Name = topicName;
+ responseTopic.ErrorCode = NONE_ERROR;
+ response->Results.push_back(responseTopic);
+ }
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, NONE_ERROR));
+ Die(ctx);
+};
+}
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h
new file mode 100644
index 0000000000..60ea85e078
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.h
@@ -0,0 +1,41 @@
+#include "actors.h"
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+
+namespace NKafka {
+
+class TKafkaCreatePartitionsActor: public NActors::TActorBootstrapped<TKafkaCreatePartitionsActor> {
+public:
+ TKafkaCreatePartitionsActor(
+ const TContext::TPtr context,
+ const ui64 correlationId,
+ const TMessagePtr<TCreatePartitionsRequestData>& message)
+ : Context(context)
+ , CorrelationId(correlationId)
+ , Message(message) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+ void Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx);
+ void Reply(const TActorContext& ctx);
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKafka::TEvTopicModificationResponse, Handle);
+ }
+ }
+
+private:
+ const TContext::TPtr Context;
+ const ui64 CorrelationId;
+ const TMessagePtr<TCreatePartitionsRequestData> Message;
+ std::unordered_set<TString> DuplicateTopicNames;
+ ui32 InflyTopics = 0;
+ std::unordered_map<TString, TAutoPtr<TEvKafka::TEvTopicModificationResponse>> TopicNamesToResponses;
+
+ TStringBuilder InputLogMessage();
+ void ProcessValidateOnly(const NActors::TActorContext& ctx);
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
index 5702f844a2..ecb3ddd1b3 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
@@ -17,7 +17,7 @@ public:
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
- const std::function<void(TEvKafka::TEvCreateTopicsResponse::EStatus, TString&)> sendResultCallback)
+ const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
: UserToken(userToken)
, TopicPath(topicPath)
, DatabaseName(databaseName)
@@ -211,19 +211,19 @@ private:
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
const TString TopicPath;
const TString DatabaseName;
- const std::function<void(TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message)> SendResultCallback;
+ const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
TString ReplyMessage;
void processYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
switch (status) {
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
- SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::OK, ReplyMessage);
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
- SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST, ReplyMessage);
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
break;
default:
- SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::ERROR, ReplyMessage);
+ SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
}
};
@@ -244,7 +244,7 @@ public:
userToken,
topicPath,
databaseName,
- [this](TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) {
+ [this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
this->SendResult(status, message);
})
)
@@ -259,8 +259,8 @@ public:
~TCreateTopicActor() = default;
- void SendResult(TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) {
- THolder<TEvKafka::TEvCreateTopicsResponse> response(new TEvKafka::TEvCreateTopicsResponse());
+ void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
+ THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
response->Status = status;
response->TopicPath = TopicPath;
response->Message = message;
@@ -377,10 +377,10 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
}
if (topicName == "") {
- auto result = MakeHolder<TEvKafka::TEvCreateTopicsResponse>();
- result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST;
+ auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
+ result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST;
result->Message = "Empty topic name";
- this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvCreateTopicsResponse>(result.Release());
+ this->TopicNamesToResponses[topicName] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
continue;
}
@@ -394,10 +394,10 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
retention = std::stoul(config.Value.value());
return true;
} catch(std::invalid_argument) {
- auto result = MakeHolder<TEvKafka::TEvCreateTopicsResponse>();
- result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::INVALID_CONFIG;
+ auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
+ result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::INVALID_CONFIG;
result->Message = "Provided retention value is not a number";
- this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr<TEvKafka::TEvCreateTopicsResponse>(result.Release());
+ this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr<TEvKafka::TEvTopicModificationResponse>(result.Release());
return false;
}
};
@@ -446,7 +446,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
}
};
-void TKafkaCreateTopicsActor::Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx) {
+void TKafkaCreateTopicsActor::Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx) {
auto eventPtr = ev->Release();
KAFKA_LOG_D(TStringBuilder() << "Create topics actor. Topic's " << eventPtr->TopicPath << " response received." << std::to_string(eventPtr->Status));
TopicNamesToResponses[eventPtr->TopicPath] = eventPtr;
@@ -490,18 +490,22 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
setError(DUPLICATE_RESOURCE);
} else {
switch (TopicNamesToResponses[topicName]->Status) {
- case TEvKafka::TEvCreateTopicsResponse::OK:
+ case TEvKafka::TEvTopicModificationResponse::OK:
responseTopic.ErrorCode = NONE_ERROR;
addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME);
addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME);
break;
- case TEvKafka::TEvCreateTopicsResponse::BAD_REQUEST:
+ case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST:
setError(INVALID_REQUEST);
break;
- case TEvKafka::TEvCreateTopicsResponse::ERROR:
+ case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST:
+ KAFKA_LOG_ERROR("Create topics actor: Topic: [" << topicName << "]. Unexpected TOPIC_DOES_NOT_EXIST status received.");
setError(UNKNOWN_SERVER_ERROR);
break;
- case TEvKafka::TEvCreateTopicsResponse::INVALID_CONFIG:
+ case TEvKafka::TEvTopicModificationResponse::ERROR:
+ setError(UNKNOWN_SERVER_ERROR);
+ break;
+ case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG:
setError(INVALID_CONFIG);
break;
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h
index afa1657311..fad6dd15a2 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h
@@ -14,12 +14,12 @@ public:
}
void Bootstrap(const NActors::TActorContext& ctx);
- void Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(const TEvKafka::TEvTopicModificationResponse::TPtr& ev, const TActorContext& ctx);
void Reply(const TActorContext& ctx);
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvKafka::TEvCreateTopicsResponse, Handle);
+ HFunc(TEvKafka::TEvTopicModificationResponse, Handle);
}
}
@@ -29,7 +29,7 @@ private:
const TMessagePtr<TCreateTopicsRequestData> Message;
std::unordered_set<TString> DuplicateTopicNames;
ui32 InflyTopics = 0;
- std::unordered_map<TString, TAutoPtr<TEvKafka::TEvCreateTopicsResponse>> TopicNamesToResponses;
+ std::unordered_map<TString, TAutoPtr<TEvKafka::TEvTopicModificationResponse>> TopicNamesToResponses;
std::unordered_map<TString, std::pair<std::optional<ui64>, std::optional<ui64>>> TopicNamesToRetentions;
TStringBuilder InputLogMessage();
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 57e728f548..de4ca12a8d 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -306,6 +306,10 @@ protected:
Register(CreateKafkaCreateTopicsActor(Context, header->CorrelationId, message));
}
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TCreatePartitionsRequestData>& message) {
+ Register(CreateKafkaCreatePartitionsActor(Context, header->CorrelationId, message));
+ }
+
template<class T>
TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) {
return TMessagePtr<T>(request->Buffer, request->Message);
@@ -390,6 +394,10 @@ protected:
HandleMessage(&Request->Header, Cast<TCreateTopicsRequestData>(Request));
break;
+ case CREATE_PARTITIONS:
+ HandleMessage(&Request->Header, Cast<TCreatePartitionsRequestData>(Request));
+ break;
+
default:
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
PassAway();
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index e2132363a3..1bec0c622a 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -228,7 +228,7 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
};
-struct TEvCreateTopicsResponse : public NActors::TEventLocal<TEvCreateTopicsResponse, EvCreateTopicsResponse>
+struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
{
enum EStatus {
@@ -236,9 +236,10 @@ struct TEvCreateTopicsResponse : public NActors::TEventLocal<TEvCreateTopicsResp
ERROR,
BAD_REQUEST,
INVALID_CONFIG,
+ TOPIC_DOES_NOT_EXIST,
};
- TEvCreateTopicsResponse()
+ TEvTopicModificationResponse()
{}
TString TopicPath;
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 01b9c8c00c..8b08d98ce4 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -24,6 +24,7 @@ const std::unordered_map<EApiKey, TString> EApiKeyNames = {
{EApiKey::CREATE_TOPICS, "CREATE_TOPICS"},
{EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"},
{EApiKey::SASL_AUTHENTICATE, "SASL_AUTHENTICATE"},
+ {EApiKey::CREATE_PARTITIONS, "CREATE_PARTITIONS"},
};
@@ -61,6 +62,8 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) {
return std::make_unique<TInitProducerIdRequestData>();
case SASL_AUTHENTICATE:
return std::make_unique<TSaslAuthenticateRequestData>();
+ case CREATE_PARTITIONS:
+ return std::make_unique<TCreatePartitionsRequestData>();
default:
ythrow yexception() << "Unsupported request API key " << apiKey;
}
@@ -100,6 +103,8 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) {
return std::make_unique<TInitProducerIdResponseData>();
case SASL_AUTHENTICATE:
return std::make_unique<TSaslAuthenticateResponseData>();
+ case CREATE_PARTITIONS:
+ return std::make_unique<TCreatePartitionsResponseData>();
default:
ythrow yexception() << "Unsupported response API key " << apiKey;
}
@@ -199,6 +204,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 1;
}
+ case CREATE_PARTITIONS:
+ if (_version >= 2) {
+ return 2;
+ } else {
+ return 1;
+ }
default:
ythrow yexception() << "Unsupported API key " << apiKey;
break;
@@ -297,6 +308,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 0;
}
+ case CREATE_PARTITIONS:
+ if (_version >= 2) {
+ return 1;
+ } else {
+ return 0;
+ }
default:
ythrow yexception() << "Unsupported API key " << apiKey;
break;
@@ -5667,4 +5684,296 @@ i32 TSaslAuthenticateResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
+
+
+//
+// TCreatePartitionsRequestData
+//
+const TCreatePartitionsRequestData::TimeoutMsMeta::Type TCreatePartitionsRequestData::TimeoutMsMeta::Default = 0;
+const TCreatePartitionsRequestData::ValidateOnlyMeta::Type TCreatePartitionsRequestData::ValidateOnlyMeta::Default = false;
+
+TCreatePartitionsRequestData::TCreatePartitionsRequestData()
+ : TimeoutMs(TimeoutMsMeta::Default)
+ , ValidateOnly(ValidateOnlyMeta::Default)
+{}
+
+void TCreatePartitionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData";
+ }
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+ NPrivate::Read<TimeoutMsMeta>(_readable, _version, TimeoutMs);
+ NPrivate::Read<ValidateOnlyMeta>(_readable, _version, ValidateOnly);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TCreatePartitionsRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics);
+ NPrivate::Write<TimeoutMsMeta>(_collector, _writable, _version, TimeoutMs);
+ NPrivate::Write<ValidateOnlyMeta>(_collector, _writable, _version, ValidateOnly);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TCreatePartitionsRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<TopicsMeta>(_collector, _version, Topics);
+ NPrivate::Size<TimeoutMsMeta>(_collector, _version, TimeoutMs);
+ NPrivate::Size<ValidateOnlyMeta>(_collector, _version, ValidateOnly);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TCreatePartitionsRequestData::TCreatePartitionsTopic
+//
+const TCreatePartitionsRequestData::TCreatePartitionsTopic::NameMeta::Type TCreatePartitionsRequestData::TCreatePartitionsTopic::NameMeta::Default = {""};
+const TCreatePartitionsRequestData::TCreatePartitionsTopic::CountMeta::Type TCreatePartitionsRequestData::TCreatePartitionsTopic::CountMeta::Default = 0;
+
+TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsTopic()
+ : Name(NameMeta::Default)
+ , Count(CountMeta::Default)
+{}
+
+void TCreatePartitionsRequestData::TCreatePartitionsTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<CountMeta>(_readable, _version, Count);
+ NPrivate::Read<AssignmentsMeta>(_readable, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TCreatePartitionsRequestData::TCreatePartitionsTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<NameMeta>(_collector, _writable, _version, Name);
+ NPrivate::Write<CountMeta>(_collector, _writable, _version, Count);
+ NPrivate::Write<AssignmentsMeta>(_collector, _writable, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TCreatePartitionsRequestData::TCreatePartitionsTopic::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<NameMeta>(_collector, _version, Name);
+ NPrivate::Size<CountMeta>(_collector, _version, Count);
+ NPrivate::Size<AssignmentsMeta>(_collector, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment
+//
+
+TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::TCreatePartitionsAssignment()
+{}
+
+void TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment";
+ }
+ NPrivate::Read<BrokerIdsMeta>(_readable, _version, BrokerIds);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<BrokerIdsMeta>(_collector, _writable, _version, BrokerIds);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TCreatePartitionsRequestData::TCreatePartitionsTopic::TCreatePartitionsAssignment::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<BrokerIdsMeta>(_collector, _version, BrokerIds);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TCreatePartitionsResponseData
+//
+const TCreatePartitionsResponseData::ThrottleTimeMsMeta::Type TCreatePartitionsResponseData::ThrottleTimeMsMeta::Default = 0;
+
+TCreatePartitionsResponseData::TCreatePartitionsResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+{}
+
+void TCreatePartitionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ResultsMeta>(_readable, _version, Results);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TCreatePartitionsResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ResultsMeta>(_collector, _writable, _version, Results);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TCreatePartitionsResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ResultsMeta>(_collector, _version, Results);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TCreatePartitionsResponseData::TCreatePartitionsTopicResult
+//
+const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::NameMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::NameMeta::Default = {""};
+const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorCodeMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorCodeMeta::Default = 0;
+const TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorMessageMeta::Type TCreatePartitionsResponseData::TCreatePartitionsTopicResult::ErrorMessageMeta::Default = std::nullopt;
+
+TCreatePartitionsResponseData::TCreatePartitionsTopicResult::TCreatePartitionsTopicResult()
+ : Name(NameMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+ , ErrorMessage(ErrorMessageMeta::Default)
+{}
+
+void TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TCreatePartitionsResponseData::TCreatePartitionsTopicResult";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TCreatePartitionsResponseData::TCreatePartitionsTopicResult";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<NameMeta>(_collector, _writable, _version, Name);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TCreatePartitionsResponseData::TCreatePartitionsTopicResult::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<NameMeta>(_collector, _version, Name);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
} //namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index bbe56aabea..1b547a4c44 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -32,6 +32,7 @@ enum EApiKey {
CREATE_TOPICS = 19, // [ZK_BROKER, BROKER, CONTROLLER]
INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER]
SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER]
+ CREATE_PARTITIONS = 37, // [ZK_BROKER, BROKER, CONTROLLER]
};
extern const std::unordered_map<EApiKey, TString> EApiKeyNames;
@@ -6181,4 +6182,281 @@ public:
bool operator==(const TSaslAuthenticateResponseData& other) const = default;
};
+
+class TCreatePartitionsRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TCreatePartitionsRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TCreatePartitionsRequestData();
+ ~TCreatePartitionsRequestData() = default;
+
+ class TCreatePartitionsTopic : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TCreatePartitionsTopic();
+ ~TCreatePartitionsTopic() = default;
+
+ class TCreatePartitionsAssignment : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TCreatePartitionsAssignment();
+ ~TCreatePartitionsAssignment() = default;
+
+ struct BrokerIdsMeta {
+ using ItemType = TKafkaInt32;
+ using ItemTypeDesc = NPrivate::TKafkaIntDesc;
+ using Type = std::vector<TKafkaInt32>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "brokerIds";
+ static constexpr const char* About = "The assigned broker IDs.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ BrokerIdsMeta::Type BrokerIds;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCreatePartitionsAssignment& other) const = default;
+ };
+
+ struct NameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "name";
+ static constexpr const char* About = "The topic name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ NameMeta::Type Name;
+
+ struct CountMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "count";
+ static constexpr const char* About = "The new partition count.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ CountMeta::Type Count;
+
+ struct AssignmentsMeta {
+ using ItemType = TCreatePartitionsAssignment;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TCreatePartitionsAssignment>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "assignments";
+ static constexpr const char* About = "The new partition assignments.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ AssignmentsMeta::Type Assignments;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCreatePartitionsTopic& other) const = default;
+ };
+
+ struct TopicsMeta {
+ using ItemType = TCreatePartitionsTopic;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TCreatePartitionsTopic>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "topics";
+ static constexpr const char* About = "Each topic that we want to create new partitions inside.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ TopicsMeta::Type Topics;
+
+ struct TimeoutMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "timeoutMs";
+ static constexpr const char* About = "The time in ms to wait for the partitions to be created.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ TimeoutMsMeta::Type TimeoutMs;
+
+ struct ValidateOnlyMeta {
+ using Type = TKafkaBool;
+ using TypeDesc = NPrivate::TKafkaBoolDesc;
+
+ static constexpr const char* Name = "validateOnly";
+ static constexpr const char* About = "If true, then validate the request, but don't actually increase the number of partitions.";
+ static const Type Default; // = false;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ValidateOnlyMeta::Type ValidateOnly;
+
+ i16 ApiKey() const override { return CREATE_PARTITIONS; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCreatePartitionsRequestData& other) const = default;
+};
+
+
+class TCreatePartitionsResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TCreatePartitionsResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TCreatePartitionsResponseData();
+ ~TCreatePartitionsResponseData() = default;
+
+ class TCreatePartitionsTopicResult : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TCreatePartitionsTopicResult();
+ ~TCreatePartitionsTopicResult() = default;
+
+ struct NameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "name";
+ static constexpr const char* About = "The topic name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ NameMeta::Type Name;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The result error, or zero if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct ErrorMessageMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "errorMessage";
+ static constexpr const char* About = "The result message, or null if there was no error.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ErrorMessageMeta::Type ErrorMessage;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCreatePartitionsTopicResult& other) const = default;
+ };
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ResultsMeta {
+ using ItemType = TCreatePartitionsTopicResult;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TCreatePartitionsTopicResult>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "results";
+ static constexpr const char* About = "The partition creation results for each topic.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ResultsMeta::Type Results;
+
+ i16 ApiKey() const override { return CREATE_PARTITIONS; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCreatePartitionsResponseData& other) const = default;
+};
+
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index d590686576..12e906c395 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -634,6 +634,25 @@ public:
return WriteAndRead<TCreateTopicsResponseData>(header, request);
}
+ TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TopicToCreate> topicsToCreate, bool validateOnly = false) {
+ Cerr << ">>>>> TCreateTopicsRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3);
+ TCreatePartitionsRequestData request;
+ request.ValidateOnly = validateOnly;
+ request.TimeoutMs = 100;
+
+ for (auto& topicToCreate : topicsToCreate) {
+ NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic;
+ topic.Name = topicToCreate.Name;
+ topic.Count = topicToCreate.PartitionsNumber;
+
+ request.Topics.push_back(topic);
+ }
+
+ return WriteAndRead<TCreatePartitionsResponseData>(header, request);
+ }
+
void UnknownApiKey() {
Cerr << ">>>>> Unknown apiKey\n";
@@ -1730,6 +1749,159 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
} // Y_UNIT_TEST(CreateTopicsScenario)
+ Y_UNIT_TEST(CreatePartitionsScenario) {
+
+ TInsecureTestServer testServer("2");
+
+ TString topic1Name = "/Root/topic-1-test";
+ TString shortTopic1Name = "topic-1-test";
+
+ TString topic2Name = "/Root/topic-2-test";
+ TString shortTopic2Name = "topic-2-test";
+
+ TString key = "record-key";
+ TString value = "record-value";
+ TString headerKey = "header-key";
+ TString headerValue = "header-value";
+
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ {
+ auto result =
+ pqClient
+ .CreateTopic(topic1Name,
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(10, 100))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ {
+ auto result =
+ pqClient
+ .CreateTopic(topic2Name,
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(20, 100))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ TTestClient client(testServer.Port);
+
+ {
+ auto msg = client.ApiVersions();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 15u);
+ }
+
+ {
+ auto msg = client.SaslHandshake();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true);
+
+ {
+ // Validate only
+ auto msg = client.CreatePartitions({
+ TopicToCreate(topic1Name, 11),
+ TopicToCreate(topic2Name, 21)
+ }, true);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topic1Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), topic2Name);
+
+ auto result0 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result0.IsSuccess());
+ UNIT_ASSERT_EQUAL(result0.GetTopicDescription().GetPartitions().size(), 10);
+
+ auto result1 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result1.IsSuccess());
+ UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 20);
+ }
+
+ {
+ // Increase partitions number
+ auto msg = client.CreatePartitions({
+ TopicToCreate(shortTopic1Name, 11),
+ TopicToCreate(shortTopic2Name, 21)
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), shortTopic2Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].ErrorCode, NONE_ERROR);
+
+ auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result1.IsSuccess());
+ UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11);
+
+ auto result2 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result2.IsSuccess());
+ UNIT_ASSERT_EQUAL(result2.GetTopicDescription().GetPartitions().size(), 21);
+ }
+
+ {
+ // Check with two same topic names
+ auto msg = client.CreatePartitions({
+ TopicToCreate(shortTopic1Name, 11),
+ TopicToCreate(shortTopic1Name, 11)
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, DUPLICATE_RESOURCE);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), shortTopic1Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].ErrorCode, DUPLICATE_RESOURCE);
+
+ auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result1.IsSuccess());
+ UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11);
+
+ auto result2 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result2.IsSuccess());
+ UNIT_ASSERT_EQUAL(result2.GetTopicDescription().GetPartitions().size(), 21);
+ }
+
+ {
+ // Check with lesser partitions number
+ auto msg = client.CreatePartitions({ TopicToCreate(shortTopic1Name, 1) });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST);
+
+ auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result1.IsSuccess());
+ UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11);
+ }
+
+ {
+ // Check with nonexistent topic name
+ auto topicName = "NonExTopicName";
+ auto msg = client.CreatePartitions({ TopicToCreate(topicName, 1) });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST);
+
+ auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result1.IsSuccess());
+ UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11);
+ }
+ } // Y_UNIT_TEST(CreatePartitionsScenario)
+
Y_UNIT_TEST(LoginWithApiKey) {
TInsecureTestServer testServer;
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index a74745d4d8..5567492196 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -16,6 +16,7 @@ SRCS(
actors/kafka_offset_fetch_actor.cpp
actors/kafka_offset_commit_actor.cpp
actors/kafka_create_topics_actor.cpp
+ actors/kafka_create_partitions_actor.cpp
kafka_connection.cpp
kafka_connection.h
kafka_constants.h