diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-11 14:49:27 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-12 10:53:54 +0300 |
commit | f63cf8e7aa4a6d8c8361fd595949056678b6a12a (patch) | |
tree | e121167156283270cd4420b4bb8f6e5633857287 /yt | |
parent | 305278d03ff0b0c667947b1e584f1fba4a7d91c2 (diff) | |
download | ydb-f63cf8e7aa4a6d8c8361fd595949056678b6a12a.tar.gz |
[kafka] YT-21744: Cosmetics
7e1e2306abcd13040873396f6f3b594d9c08da38
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/client/kafka/error.h | 2 | ||||
-rw-r--r-- | yt/yt/client/kafka/packet.cpp | 9 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.cpp | 34 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.h | 4 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 93 |
5 files changed, 50 insertions, 92 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h index 0a71016ed0..a3ea32e12f 100644 --- a/yt/yt/client/kafka/error.h +++ b/yt/yt/client/kafka/error.h @@ -6,7 +6,7 @@ namespace NYT::NKafka { //////////////////////////////////////////////////////////////////////////////// -DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t, +DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16, ((UnknownServerError) (-1)) ((None) (0)) ((TopicAuthorizationFailed) (29)) diff --git a/yt/yt/client/kafka/packet.cpp b/yt/yt/client/kafka/packet.cpp index 8866c8f622..f1968cfa62 100644 --- a/yt/yt/client/kafka/packet.cpp +++ b/yt/yt/client/kafka/packet.cpp @@ -19,8 +19,9 @@ struct TPacketDecoderTag //////////////////////////////////////////////////////////////////////////////// -struct TPacketTranscoderBase +class TPacketTranscoderBase { +protected: union { int MessageSize; char Data[sizeof(int)]; @@ -189,11 +190,7 @@ public: YT_ASSERT(flags == EPacketFlags::None); YT_ASSERT(!messageParts.Empty()); - i64 messageSize = 0; - for (const auto& messagePart : messageParts) { - messageSize += messagePart.size(); - } - Header_.MessageSize = messageSize; + Header_.MessageSize = messageParts.ByteSize(); std::reverse(std::begin(Header_.Data), std::end(Header_.Data)); MessageParts_ = std::move(messageParts); diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp index aacd170d68..3ac1ef1078 100644 --- a/yt/yt/client/kafka/protocol.cpp +++ b/yt/yt/client/kafka/protocol.cpp @@ -4,7 +4,7 @@ #include <library/cpp/yt/coding/varint.h> -#include <util/generic/guid.h> +#include <library/cpp/yt/string/guid.h> namespace NYT::NKafka { @@ -137,11 +137,11 @@ public: return result; } - TGUID ReadUuid() override + TGuid ReadUuid() override { TString value; ReadString(&value, 16); - return GetGuid(value); + return TGuid::FromString(value); } void ReadString(TString* result, int length) override @@ -173,7 +173,7 @@ public: if (needReadCount) { size = ReadInt32(); } - BytesBegins_.push_back(Offset_); + BytesOffsets_.push_back(Offset_); return size; } @@ -183,22 +183,22 @@ public: if (needReadCount) { size = ReadUnsignedVarInt() - 1; } - BytesBegins_.push_back(Offset_); + BytesOffsets_.push_back(Offset_); return size; } i32 GetReadBytesCount() override { - if (!BytesBegins_.empty()) { - return Offset_ - BytesBegins_.back(); + if (!BytesOffsets_.empty()) { + return Offset_ - BytesOffsets_.back(); } return 0; } void FinishReadBytes() override { - if (!BytesBegins_.empty()) { - return BytesBegins_.pop_back(); + if (!BytesOffsets_.empty()) { + return BytesOffsets_.pop_back(); } } @@ -225,7 +225,7 @@ private: const TSharedRef Data_; i64 Offset_ = 0; - std::vector<i64> BytesBegins_; + std::vector<i64> BytesOffsets_; template <typename T> T DoReadInt() @@ -314,9 +314,9 @@ public: Size_ += WriteVarUint32(Buffer_.begin() + Size_, value); } - void WriteUuid(TGUID value) override + void WriteUuid(TGuid value) override { - WriteString(value.AsUuidString()); + WriteString(ToString(value)); } void WriteErrorCode(EErrorCode value) override @@ -378,14 +378,14 @@ public: void StartBytes() override { WriteInt32(0); - BytesBegins_.push_back(Size_); + BytesOffsets_.push_back(Size_); } void FinishBytes() override { - YT_VERIFY(!BytesBegins_.empty()); - DoWriteInt<int32_t>(Size_ - BytesBegins_.back(), BytesBegins_.back() - sizeof(int32_t)); - BytesBegins_.pop_back(); + YT_VERIFY(!BytesOffsets_.empty()); + DoWriteInt<int32_t>(Size_ - BytesOffsets_.back(), BytesOffsets_.back() - sizeof(int32_t)); + BytesOffsets_.pop_back(); } TSharedRef Finish() override @@ -403,7 +403,7 @@ private: TSharedMutableRef Buffer_; i64 Size_ = 0; - std::vector<i64> BytesBegins_; + std::vector<i64> BytesOffsets_; template <typename T> void DoWriteInt(T value, std::optional<i64> position = std::nullopt) diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h index 10985777f3..0ed7464991 100644 --- a/yt/yt/client/kafka/protocol.h +++ b/yt/yt/client/kafka/protocol.h @@ -25,7 +25,7 @@ struct IKafkaProtocolReader virtual i64 ReadVarLong() = 0; virtual ui32 ReadUnsignedVarInt() = 0; - virtual TGUID ReadUuid() = 0; + virtual TGuid ReadUuid() = 0; virtual TString ReadString() = 0; virtual TString ReadCompactString() = 0; @@ -70,7 +70,7 @@ struct IKafkaProtocolWriter virtual void WriteVarLong(i64 value) = 0; virtual void WriteUnsignedVarInt(ui32 value) = 0; - virtual void WriteUuid(TGUID value) = 0; + virtual void WriteUuid(TGuid value) = 0; virtual void WriteErrorCode(EErrorCode value) = 0; diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index 9c9dec3aa1..d3f44673e3 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -4,7 +4,7 @@ #include "protocol.h" -#include <util/generic/guid.h> +#include <library/cpp/yt/misc/guid.h> namespace NYT::NKafka { @@ -37,7 +37,6 @@ struct TTaggedField TString Data; void Serialize(IKafkaProtocolWriter* writer) const; - void Deserialize(IKafkaProtocolReader* reader); }; @@ -87,7 +86,6 @@ struct TMessage TString Value; void Serialize(IKafkaProtocolWriter* writer, int version) const; - void Deserialize(IKafkaProtocolReader* reader, int version); }; @@ -117,7 +115,6 @@ struct TRecord std::vector<TMessage> Messages; void Serialize(IKafkaProtocolWriter* writer) const; - void Deserialize(IKafkaProtocolReader* reader); }; @@ -164,16 +161,13 @@ void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isComp struct TReqApiVersions { + static constexpr ERequestType RequestType = ERequestType::ApiVersions; + TString ClientSoftwareName; TString ClientSoftwareVersion; std::vector<TTaggedField> TagBuffer; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::ApiVersions; - } }; struct TRspApiKey @@ -200,7 +194,7 @@ struct TRspApiVersions struct TReqMetadataTopic { - TGUID TopicId; + TGuid TopicId; TString Topic; std::vector<TTaggedField> TagBuffer; @@ -209,6 +203,8 @@ struct TReqMetadataTopic struct TReqMetadata { + static constexpr ERequestType RequestType = ERequestType::Metadata; + std::vector<TReqMetadataTopic> Topics; bool AllowAutoTopicCreation; bool IncludeClusterAuthorizedOperations; @@ -216,11 +212,6 @@ struct TReqMetadata std::vector<TTaggedField> TagBuffer; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::Metadata; - } }; struct TRspMetadataBroker @@ -253,7 +244,7 @@ struct TRspMetadataTopic { EErrorCode ErrorCode = EErrorCode::None; TString Name; - TGUID TopicId; + TGuid TopicId; bool IsInternal = false; std::vector<TRspMetadataTopicPartition> Partitions; i32 TopicAuthorizedOperations = 0; @@ -278,14 +269,11 @@ struct TRspMetadata struct TReqFindCoordinator { + static constexpr ERequestType RequestType = ERequestType::FindCoordinator; + TString Key; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::FindCoordinator; - } }; struct TRspFindCoordinator @@ -310,6 +298,8 @@ struct TReqJoinGroupProtocol struct TReqJoinGroup { + static constexpr ERequestType RequestType = ERequestType::JoinGroup; + TString GroupId; i32 SessionTimeoutMs = 0; TString MemberId; @@ -317,11 +307,6 @@ struct TReqJoinGroup std::vector<TReqJoinGroupProtocol> Protocols; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::JoinGroup; - } }; struct TRspJoinGroupMember @@ -356,17 +341,14 @@ struct TReqSyncGroupAssignment struct TReqSyncGroup { + static constexpr ERequestType RequestType = ERequestType::SyncGroup; + TString GroupId; TString GenerationId; TString MemberId; std::vector<TReqSyncGroupAssignment> Assignments; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::SyncGroup; - } }; struct TRspSyncGroupAssignment @@ -389,16 +371,13 @@ struct TRspSyncGroup struct TReqHeartbeat { + static constexpr ERequestType RequestType = ERequestType::Heartbeat; + TString GroupId; i32 GenerationId = 0; TString MemberId; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::Heartbeat; - } }; struct TRspHeartbeat @@ -429,15 +408,12 @@ struct TReqOffsetCommitTopic struct TReqOffsetCommit { + static constexpr ERequestType RequestType = ERequestType::OffsetCommit; + TString GroupId; std::vector<TReqOffsetCommitTopic> Topics; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::OffsetCommit; - } }; struct TRspOffsetCommitTopicPartition @@ -474,15 +450,12 @@ struct TReqOffsetFetchTopic struct TReqOffsetFetch { + static constexpr ERequestType RequestType = ERequestType::OffsetFetch; + TString GroupId; std::vector<TReqOffsetFetchTopic> Topics; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::OffsetFetch; - } }; struct TRspOffsetFetchTopicPartition @@ -531,17 +504,14 @@ struct TReqFetchTopic struct TReqFetch { + static constexpr ERequestType RequestType = ERequestType::Fetch; + i32 ReplicaId = 0; i32 MaxWaitMs = 0; i32 MinBytes = 0; std::vector<TReqFetchTopic> Topics; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::Fetch; - } }; struct TRspFetchResponsePartition @@ -573,14 +543,11 @@ struct TRspFetch struct TReqSaslHandshake { + static constexpr ERequestType RequestType = ERequestType::SaslHandshake; + TString Mechanism; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::SaslHandshake; - } }; struct TRspSaslHandshake @@ -595,14 +562,11 @@ struct TRspSaslHandshake struct TReqSaslAuthenticate { + static constexpr ERequestType RequestType = ERequestType::SaslAuthenticate; + TString AuthBytes; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::SaslAuthenticate; - } }; struct TRspSaslAuthenticate @@ -636,6 +600,8 @@ struct TReqProduceTopicData struct TReqProduce { + static constexpr ERequestType RequestType = ERequestType::Produce; + std::optional<TString> TransactionalId; i16 Acks = 0; i32 TimeoutMs = 0; @@ -643,11 +609,6 @@ struct TReqProduce std::vector<TTaggedField> TagBuffer; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); - - static ERequestType GetRequestType() - { - return ERequestType::Produce; - } }; struct TRspProduceResponsePartitionResponseRecordError |