aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-11 14:49:27 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-12 10:53:54 +0300
commitf63cf8e7aa4a6d8c8361fd595949056678b6a12a (patch)
treee121167156283270cd4420b4bb8f6e5633857287
parent305278d03ff0b0c667947b1e584f1fba4a7d91c2 (diff)
downloadydb-f63cf8e7aa4a6d8c8361fd595949056678b6a12a.tar.gz
[kafka] YT-21744: Cosmetics
7e1e2306abcd13040873396f6f3b594d9c08da38
-rw-r--r--yt/yt/client/kafka/error.h2
-rw-r--r--yt/yt/client/kafka/packet.cpp9
-rw-r--r--yt/yt/client/kafka/protocol.cpp34
-rw-r--r--yt/yt/client/kafka/protocol.h4
-rw-r--r--yt/yt/client/kafka/requests.h93
5 files changed, 50 insertions, 92 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h
index 0a71016ed0..a3ea32e12f 100644
--- a/yt/yt/client/kafka/error.h
+++ b/yt/yt/client/kafka/error.h
@@ -6,7 +6,7 @@ namespace NYT::NKafka {
////////////////////////////////////////////////////////////////////////////////
-DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t,
+DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16,
((UnknownServerError) (-1))
((None) (0))
((TopicAuthorizationFailed) (29))
diff --git a/yt/yt/client/kafka/packet.cpp b/yt/yt/client/kafka/packet.cpp
index 8866c8f622..f1968cfa62 100644
--- a/yt/yt/client/kafka/packet.cpp
+++ b/yt/yt/client/kafka/packet.cpp
@@ -19,8 +19,9 @@ struct TPacketDecoderTag
////////////////////////////////////////////////////////////////////////////////
-struct TPacketTranscoderBase
+class TPacketTranscoderBase
{
+protected:
union {
int MessageSize;
char Data[sizeof(int)];
@@ -189,11 +190,7 @@ public:
YT_ASSERT(flags == EPacketFlags::None);
YT_ASSERT(!messageParts.Empty());
- i64 messageSize = 0;
- for (const auto& messagePart : messageParts) {
- messageSize += messagePart.size();
- }
- Header_.MessageSize = messageSize;
+ Header_.MessageSize = messageParts.ByteSize();
std::reverse(std::begin(Header_.Data), std::end(Header_.Data));
MessageParts_ = std::move(messageParts);
diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp
index aacd170d68..3ac1ef1078 100644
--- a/yt/yt/client/kafka/protocol.cpp
+++ b/yt/yt/client/kafka/protocol.cpp
@@ -4,7 +4,7 @@
#include <library/cpp/yt/coding/varint.h>
-#include <util/generic/guid.h>
+#include <library/cpp/yt/string/guid.h>
namespace NYT::NKafka {
@@ -137,11 +137,11 @@ public:
return result;
}
- TGUID ReadUuid() override
+ TGuid ReadUuid() override
{
TString value;
ReadString(&value, 16);
- return GetGuid(value);
+ return TGuid::FromString(value);
}
void ReadString(TString* result, int length) override
@@ -173,7 +173,7 @@ public:
if (needReadCount) {
size = ReadInt32();
}
- BytesBegins_.push_back(Offset_);
+ BytesOffsets_.push_back(Offset_);
return size;
}
@@ -183,22 +183,22 @@ public:
if (needReadCount) {
size = ReadUnsignedVarInt() - 1;
}
- BytesBegins_.push_back(Offset_);
+ BytesOffsets_.push_back(Offset_);
return size;
}
i32 GetReadBytesCount() override
{
- if (!BytesBegins_.empty()) {
- return Offset_ - BytesBegins_.back();
+ if (!BytesOffsets_.empty()) {
+ return Offset_ - BytesOffsets_.back();
}
return 0;
}
void FinishReadBytes() override
{
- if (!BytesBegins_.empty()) {
- return BytesBegins_.pop_back();
+ if (!BytesOffsets_.empty()) {
+ return BytesOffsets_.pop_back();
}
}
@@ -225,7 +225,7 @@ private:
const TSharedRef Data_;
i64 Offset_ = 0;
- std::vector<i64> BytesBegins_;
+ std::vector<i64> BytesOffsets_;
template <typename T>
T DoReadInt()
@@ -314,9 +314,9 @@ public:
Size_ += WriteVarUint32(Buffer_.begin() + Size_, value);
}
- void WriteUuid(TGUID value) override
+ void WriteUuid(TGuid value) override
{
- WriteString(value.AsUuidString());
+ WriteString(ToString(value));
}
void WriteErrorCode(EErrorCode value) override
@@ -378,14 +378,14 @@ public:
void StartBytes() override
{
WriteInt32(0);
- BytesBegins_.push_back(Size_);
+ BytesOffsets_.push_back(Size_);
}
void FinishBytes() override
{
- YT_VERIFY(!BytesBegins_.empty());
- DoWriteInt<int32_t>(Size_ - BytesBegins_.back(), BytesBegins_.back() - sizeof(int32_t));
- BytesBegins_.pop_back();
+ YT_VERIFY(!BytesOffsets_.empty());
+ DoWriteInt<int32_t>(Size_ - BytesOffsets_.back(), BytesOffsets_.back() - sizeof(int32_t));
+ BytesOffsets_.pop_back();
}
TSharedRef Finish() override
@@ -403,7 +403,7 @@ private:
TSharedMutableRef Buffer_;
i64 Size_ = 0;
- std::vector<i64> BytesBegins_;
+ std::vector<i64> BytesOffsets_;
template <typename T>
void DoWriteInt(T value, std::optional<i64> position = std::nullopt)
diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h
index 10985777f3..0ed7464991 100644
--- a/yt/yt/client/kafka/protocol.h
+++ b/yt/yt/client/kafka/protocol.h
@@ -25,7 +25,7 @@ struct IKafkaProtocolReader
virtual i64 ReadVarLong() = 0;
virtual ui32 ReadUnsignedVarInt() = 0;
- virtual TGUID ReadUuid() = 0;
+ virtual TGuid ReadUuid() = 0;
virtual TString ReadString() = 0;
virtual TString ReadCompactString() = 0;
@@ -70,7 +70,7 @@ struct IKafkaProtocolWriter
virtual void WriteVarLong(i64 value) = 0;
virtual void WriteUnsignedVarInt(ui32 value) = 0;
- virtual void WriteUuid(TGUID value) = 0;
+ virtual void WriteUuid(TGuid value) = 0;
virtual void WriteErrorCode(EErrorCode value) = 0;
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index 9c9dec3aa1..d3f44673e3 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -4,7 +4,7 @@
#include "protocol.h"
-#include <util/generic/guid.h>
+#include <library/cpp/yt/misc/guid.h>
namespace NYT::NKafka {
@@ -37,7 +37,6 @@ struct TTaggedField
TString Data;
void Serialize(IKafkaProtocolWriter* writer) const;
-
void Deserialize(IKafkaProtocolReader* reader);
};
@@ -87,7 +86,6 @@ struct TMessage
TString Value;
void Serialize(IKafkaProtocolWriter* writer, int version) const;
-
void Deserialize(IKafkaProtocolReader* reader, int version);
};
@@ -117,7 +115,6 @@ struct TRecord
std::vector<TMessage> Messages;
void Serialize(IKafkaProtocolWriter* writer) const;
-
void Deserialize(IKafkaProtocolReader* reader);
};
@@ -164,16 +161,13 @@ void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isComp
struct TReqApiVersions
{
+ static constexpr ERequestType RequestType = ERequestType::ApiVersions;
+
TString ClientSoftwareName;
TString ClientSoftwareVersion;
std::vector<TTaggedField> TagBuffer;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::ApiVersions;
- }
};
struct TRspApiKey
@@ -200,7 +194,7 @@ struct TRspApiVersions
struct TReqMetadataTopic
{
- TGUID TopicId;
+ TGuid TopicId;
TString Topic;
std::vector<TTaggedField> TagBuffer;
@@ -209,6 +203,8 @@ struct TReqMetadataTopic
struct TReqMetadata
{
+ static constexpr ERequestType RequestType = ERequestType::Metadata;
+
std::vector<TReqMetadataTopic> Topics;
bool AllowAutoTopicCreation;
bool IncludeClusterAuthorizedOperations;
@@ -216,11 +212,6 @@ struct TReqMetadata
std::vector<TTaggedField> TagBuffer;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::Metadata;
- }
};
struct TRspMetadataBroker
@@ -253,7 +244,7 @@ struct TRspMetadataTopic
{
EErrorCode ErrorCode = EErrorCode::None;
TString Name;
- TGUID TopicId;
+ TGuid TopicId;
bool IsInternal = false;
std::vector<TRspMetadataTopicPartition> Partitions;
i32 TopicAuthorizedOperations = 0;
@@ -278,14 +269,11 @@ struct TRspMetadata
struct TReqFindCoordinator
{
+ static constexpr ERequestType RequestType = ERequestType::FindCoordinator;
+
TString Key;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::FindCoordinator;
- }
};
struct TRspFindCoordinator
@@ -310,6 +298,8 @@ struct TReqJoinGroupProtocol
struct TReqJoinGroup
{
+ static constexpr ERequestType RequestType = ERequestType::JoinGroup;
+
TString GroupId;
i32 SessionTimeoutMs = 0;
TString MemberId;
@@ -317,11 +307,6 @@ struct TReqJoinGroup
std::vector<TReqJoinGroupProtocol> Protocols;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::JoinGroup;
- }
};
struct TRspJoinGroupMember
@@ -356,17 +341,14 @@ struct TReqSyncGroupAssignment
struct TReqSyncGroup
{
+ static constexpr ERequestType RequestType = ERequestType::SyncGroup;
+
TString GroupId;
TString GenerationId;
TString MemberId;
std::vector<TReqSyncGroupAssignment> Assignments;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::SyncGroup;
- }
};
struct TRspSyncGroupAssignment
@@ -389,16 +371,13 @@ struct TRspSyncGroup
struct TReqHeartbeat
{
+ static constexpr ERequestType RequestType = ERequestType::Heartbeat;
+
TString GroupId;
i32 GenerationId = 0;
TString MemberId;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::Heartbeat;
- }
};
struct TRspHeartbeat
@@ -429,15 +408,12 @@ struct TReqOffsetCommitTopic
struct TReqOffsetCommit
{
+ static constexpr ERequestType RequestType = ERequestType::OffsetCommit;
+
TString GroupId;
std::vector<TReqOffsetCommitTopic> Topics;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::OffsetCommit;
- }
};
struct TRspOffsetCommitTopicPartition
@@ -474,15 +450,12 @@ struct TReqOffsetFetchTopic
struct TReqOffsetFetch
{
+ static constexpr ERequestType RequestType = ERequestType::OffsetFetch;
+
TString GroupId;
std::vector<TReqOffsetFetchTopic> Topics;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::OffsetFetch;
- }
};
struct TRspOffsetFetchTopicPartition
@@ -531,17 +504,14 @@ struct TReqFetchTopic
struct TReqFetch
{
+ static constexpr ERequestType RequestType = ERequestType::Fetch;
+
i32 ReplicaId = 0;
i32 MaxWaitMs = 0;
i32 MinBytes = 0;
std::vector<TReqFetchTopic> Topics;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::Fetch;
- }
};
struct TRspFetchResponsePartition
@@ -573,14 +543,11 @@ struct TRspFetch
struct TReqSaslHandshake
{
+ static constexpr ERequestType RequestType = ERequestType::SaslHandshake;
+
TString Mechanism;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::SaslHandshake;
- }
};
struct TRspSaslHandshake
@@ -595,14 +562,11 @@ struct TRspSaslHandshake
struct TReqSaslAuthenticate
{
+ static constexpr ERequestType RequestType = ERequestType::SaslAuthenticate;
+
TString AuthBytes;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::SaslAuthenticate;
- }
};
struct TRspSaslAuthenticate
@@ -636,6 +600,8 @@ struct TReqProduceTopicData
struct TReqProduce
{
+ static constexpr ERequestType RequestType = ERequestType::Produce;
+
std::optional<TString> TransactionalId;
i16 Acks = 0;
i32 TimeoutMs = 0;
@@ -643,11 +609,6 @@ struct TReqProduce
std::vector<TTaggedField> TagBuffer;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
-
- static ERequestType GetRequestType()
- {
- return ERequestType::Produce;
- }
};
struct TRspProduceResponsePartitionResponseRecordError