summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <[email protected]>2024-05-21 14:39:16 +0300
committernadya73 <[email protected]>2024-05-21 14:50:52 +0300
commitc2982258ce9938ec66a65fc04657010f10a2fe42 (patch)
tree5ffacdf084d1bcf35a58858476273f77ac94490e
parent8c2f10cdbdfaa2fe61a2d09b380a696ff2f299db (diff)
[kafka] YT-21789: Support authentication
Support authentication 028e169116580225d34d4a0b047ee3133d7b1a1f
-rw-r--r--yt/yt/client/kafka/error.h3
-rw-r--r--yt/yt/client/kafka/requests.cpp8
-rw-r--r--yt/yt/client/kafka/requests.h8
3 files changed, 11 insertions, 8 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h
index c57d6e16018..9587ef2ae08 100644
--- a/yt/yt/client/kafka/error.h
+++ b/yt/yt/client/kafka/error.h
@@ -8,6 +8,9 @@ namespace NYT::NKafka {
DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t,
((None) (0))
+ ((TopicAuthorizationFailed) (29))
+ ((GroupAuthorizationFailed) (30))
+ ((SaslAuthenticationFailed) (31))
((UnsupportedSaslMechanism) (33))
);
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
index 499d2f82725..f3f6362c71e 100644
--- a/yt/yt/client/kafka/requests.cpp
+++ b/yt/yt/client/kafka/requests.cpp
@@ -149,7 +149,7 @@ void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion)
void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
{
- writer->WriteInt16(ErrorCode);
+ writer->WriteErrorCode(ErrorCode);
writer->WriteInt32(PartitionIndex);
writer->WriteInt32(LeaderId);
writer->WriteInt32(ReplicaNodes.size());
@@ -164,7 +164,7 @@ void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*a
void TRspMetadataTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
{
- writer->WriteInt16(ErrorCode);
+ writer->WriteErrorCode(ErrorCode);
writer->WriteString(Name);
if (apiVersion >= 1) {
writer->WriteBool(IsInternal);
@@ -327,7 +327,7 @@ void TRspOffsetFetchTopicPartition::Serialize(IKafkaProtocolWriter* writer, int
writer->WriteInt32(PartitionIndex);
writer->WriteInt64(CommittedOffset);
writer->WriteNullableString(Metadata);
- writer->WriteInt16(ErrorCode);
+ writer->WriteErrorCode(ErrorCode);
}
void TRspOffsetFetchTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
@@ -396,7 +396,7 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
{
writer->WriteInt32(PartitionIndex);
- writer->WriteInt16(ErrorCode);
+ writer->WriteErrorCode(ErrorCode);
writer->WriteInt64(HighWatermark);
if (!Records) {
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index b0c636b49e4..f72e2005646 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -120,7 +120,7 @@ struct TRspMetadataBroker
struct TRspMetadataTopicPartition
{
- int16_t ErrorCode = 0;
+ EErrorCode ErrorCode = EErrorCode::None;
int32_t PartitionIndex = 0;
int32_t LeaderId = 0;
@@ -134,7 +134,7 @@ struct TRspMetadataTopicPartition
struct TRspMetadataTopic
{
- int16_t ErrorCode = 0;
+ EErrorCode ErrorCode = EErrorCode::None;
TString Name;
TGUID TopicId;
bool IsInternal = false;
@@ -317,7 +317,7 @@ struct TRspOffsetFetchTopicPartition
int32_t PartitionIndex = 0;
int64_t CommittedOffset = 0;
std::optional<TString> Metadata;
- int16_t ErrorCode;
+ EErrorCode ErrorCode = EErrorCode::None;
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
};
@@ -394,7 +394,7 @@ struct TRecord
struct TRspFetchResponsePartition
{
int32_t PartitionIndex = 0;
- int16_t ErrorCode = 0;
+ EErrorCode ErrorCode = EErrorCode::None;
int64_t HighWatermark = 0;
std::optional<std::vector<TRecord>> Records;