diff options
author | nadya73 <[email protected]> | 2024-05-21 14:39:16 +0300 |
---|---|---|
committer | nadya73 <[email protected]> | 2024-05-21 14:50:52 +0300 |
commit | c2982258ce9938ec66a65fc04657010f10a2fe42 (patch) | |
tree | 5ffacdf084d1bcf35a58858476273f77ac94490e | |
parent | 8c2f10cdbdfaa2fe61a2d09b380a696ff2f299db (diff) |
[kafka] YT-21789: Support authentication
Support authentication
028e169116580225d34d4a0b047ee3133d7b1a1f
-rw-r--r-- | yt/yt/client/kafka/error.h | 3 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.cpp | 8 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 8 |
3 files changed, 11 insertions, 8 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h index c57d6e16018..9587ef2ae08 100644 --- a/yt/yt/client/kafka/error.h +++ b/yt/yt/client/kafka/error.h @@ -8,6 +8,9 @@ namespace NYT::NKafka { DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t, ((None) (0)) + ((TopicAuthorizationFailed) (29)) + ((GroupAuthorizationFailed) (30)) + ((SaslAuthenticationFailed) (31)) ((UnsupportedSaslMechanism) (33)) ); diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp index 499d2f82725..f3f6362c71e 100644 --- a/yt/yt/client/kafka/requests.cpp +++ b/yt/yt/client/kafka/requests.cpp @@ -149,7 +149,7 @@ void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion) void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const { - writer->WriteInt16(ErrorCode); + writer->WriteErrorCode(ErrorCode); writer->WriteInt32(PartitionIndex); writer->WriteInt32(LeaderId); writer->WriteInt32(ReplicaNodes.size()); @@ -164,7 +164,7 @@ void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*a void TRspMetadataTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const { - writer->WriteInt16(ErrorCode); + writer->WriteErrorCode(ErrorCode); writer->WriteString(Name); if (apiVersion >= 1) { writer->WriteBool(IsInternal); @@ -327,7 +327,7 @@ void TRspOffsetFetchTopicPartition::Serialize(IKafkaProtocolWriter* writer, int writer->WriteInt32(PartitionIndex); writer->WriteInt64(CommittedOffset); writer->WriteNullableString(Metadata); - writer->WriteInt16(ErrorCode); + writer->WriteErrorCode(ErrorCode); } void TRspOffsetFetchTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const @@ -396,7 +396,7 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const { writer->WriteInt32(PartitionIndex); - writer->WriteInt16(ErrorCode); + writer->WriteErrorCode(ErrorCode); writer->WriteInt64(HighWatermark); if (!Records) { diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index b0c636b49e4..f72e2005646 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -120,7 +120,7 @@ struct TRspMetadataBroker struct TRspMetadataTopicPartition { - int16_t ErrorCode = 0; + EErrorCode ErrorCode = EErrorCode::None; int32_t PartitionIndex = 0; int32_t LeaderId = 0; @@ -134,7 +134,7 @@ struct TRspMetadataTopicPartition struct TRspMetadataTopic { - int16_t ErrorCode = 0; + EErrorCode ErrorCode = EErrorCode::None; TString Name; TGUID TopicId; bool IsInternal = false; @@ -317,7 +317,7 @@ struct TRspOffsetFetchTopicPartition int32_t PartitionIndex = 0; int64_t CommittedOffset = 0; std::optional<TString> Metadata; - int16_t ErrorCode; + EErrorCode ErrorCode = EErrorCode::None; void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; }; @@ -394,7 +394,7 @@ struct TRecord struct TRspFetchResponsePartition { int32_t PartitionIndex = 0; - int16_t ErrorCode = 0; + EErrorCode ErrorCode = EErrorCode::None; int64_t HighWatermark = 0; std::optional<std::vector<TRecord>> Records; |