aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-18 21:35:29 +0300
committertesseract <tesseract@yandex-team.com>2023-09-18 22:01:34 +0300
commitc8409f20e76dba22a1e9928a4bdaa9595cf91e46 (patch)
tree979f98a82d9af1e0cd9dc4b4b74b0092d0b68ad4
parent37b099086946de1c276f0c4d9e2f102202e1d999 (diff)
downloadydb-c8409f20e76dba22a1e9928a4bdaa9595cf91e46.tar.gz
fix varint serialization
-rw-r--r--ydb/core/kafka_proxy/kafka.h51
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp344
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.cpp36
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h16
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp59
5 files changed, 272 insertions, 234 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index d085bb2d25..270d7df2e6 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -324,8 +324,27 @@ public:
TKafkaWritable& operator<<(const TKafkaRawBytes& val);
TKafkaWritable& operator<<(const TKafkaRawString& val);
- void writeUnsignedVarint(TKafkaUint32 val);
- void writeVarint(TKafkaInt32 val);
+ template<class T, typename U = std::make_unsigned_t<T>>
+ void writeUnsignedVarint(T v) {
+ static constexpr T Mask = Max<T>() - 0x7F;
+
+ U value = v;
+ while ((value & Mask) != 0L) {
+ ui8 b = (ui8) ((value & 0x7f) | 0x80);
+ write((const char*)&b, sizeof(b));
+ value >>= 7;
+ }
+ ui8 b = (ui8) value;
+ write((const char*)&b, sizeof(b));
+ }
+
+ template<class T, typename U = std::make_unsigned_t<T>>
+ void writeVarint(T value) {
+ static constexpr ui8 Shift = (sizeof(T) << 3) - 1;
+
+ writeUnsignedVarint<U>((value << 1) ^ (value >> Shift));
+ }
+
void write(const char* val, size_t length);
private:
@@ -351,8 +370,32 @@ public:
void read(char* val, size_t length);
char get();
- ui32 readUnsignedVarint();
- i32 readVarint();
+
+ template<class T, typename U = std::make_unsigned_t<T>>
+ T readUnsignedVarint() {
+ static constexpr size_t MaxLength = (sizeof(T) << 3) - 4;
+
+ U value = 0;
+ size_t i = 0;
+ ui8 b;
+ while (((b = static_cast<ui8>(get())) & 0x80) != 0) {
+ if (i > MaxLength) {
+ ythrow yexception() << "illegal varint length";
+ }
+ value |= ((U)(b & 0x7f)) << i;
+ i += 7;
+ }
+
+ value |= ((U)b) << i;
+ return value;
+ }
+
+ template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>>
+ S readVarint() {
+ U v = readUnsignedVarint<U>();
+ return (v >> 1) ^ -static_cast<S>(v & 1);
+ }
+
TArrayRef<const char> Bytes(size_t length);
// returns a character from the specified position. The current position does not change.
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 89b32e0efe..55b2ab1fa4 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -196,10 +196,10 @@ void TRequestHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version)
NPrivate::Read<ClientIdMeta>(_readable, _version, ClientId);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -255,10 +255,10 @@ void TResponseHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version
NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -315,10 +315,10 @@ void TProduceRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version
NPrivate::Read<TopicDataMeta>(_readable, _version, TopicData);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -375,10 +375,10 @@ void TProduceRequestData::TTopicProduceData::Read(TKafkaReadable& _readable, TKa
NPrivate::Read<PartitionDataMeta>(_readable, _version, PartitionData);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -431,10 +431,10 @@ void TProduceRequestData::TTopicProduceData::TPartitionProduceData::Read(TKafkaR
NPrivate::Read<RecordsMeta>(_readable, _version, Records);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -487,10 +487,10 @@ void TProduceResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _versio
NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -543,10 +543,10 @@ void TProduceResponseData::TTopicProduceResponse::Read(TKafkaReadable& _readable
NPrivate::Read<PartitionResponsesMeta>(_readable, _version, PartitionResponses);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -614,10 +614,10 @@ void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Rea
NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -682,10 +682,10 @@ void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBa
NPrivate::Read<BatchIndexErrorMessageMeta>(_readable, _version, BatchIndexErrorMessage);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -763,10 +763,10 @@ void TFetchRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version)
NPrivate::Read<RackIdMeta>(_readable, _version, RackId);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
case ClusterIdMeta::Tag:
NPrivate::ReadTag<ClusterIdMeta>(_readable, _version, ClusterId);
@@ -844,10 +844,10 @@ void TFetchRequestData::TFetchTopic::Read(TKafkaReadable& _readable, TKafkaVersi
NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -916,10 +916,10 @@ void TFetchRequestData::TFetchTopic::TFetchPartition::Read(TKafkaReadable& _read
NPrivate::Read<PartitionMaxBytesMeta>(_readable, _version, PartitionMaxBytes);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -983,10 +983,10 @@ void TFetchRequestData::TForgottenTopic::Read(TKafkaReadable& _readable, TKafkaV
NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1047,10 +1047,10 @@ void TFetchResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version)
NPrivate::Read<ResponsesMeta>(_readable, _version, Responses);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1110,10 +1110,10 @@ void TFetchResponseData::TFetchableTopicResponse::Read(TKafkaReadable& _readable
NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1187,10 +1187,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::Read(TKafkaRea
NPrivate::Read<RecordsMeta>(_readable, _version, Records);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
case DivergingEpochMeta::Tag:
NPrivate::ReadTag<DivergingEpochMeta>(_readable, _version, DivergingEpoch);
@@ -1275,10 +1275,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffse
NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1333,10 +1333,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEp
NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1391,10 +1391,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::R
NPrivate::Read<EpochMeta>(_readable, _version, Epoch);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1449,10 +1449,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransa
NPrivate::Read<FirstOffsetMeta>(_readable, _version, FirstOffset);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1508,10 +1508,10 @@ void TListOffsetsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _ver
NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1566,10 +1566,10 @@ void TListOffsetsRequestData::TListOffsetsTopic::Read(TKafkaReadable& _readable,
NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1630,10 +1630,10 @@ void TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::Read(TKa
NPrivate::Read<MaxNumOffsetsMeta>(_readable, _version, MaxNumOffsets);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1690,10 +1690,10 @@ void TListOffsetsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _ve
NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1746,10 +1746,10 @@ void TListOffsetsResponseData::TListOffsetsTopicResponse::Read(TKafkaReadable& _
NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1814,10 +1814,10 @@ void TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionR
NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1884,10 +1884,10 @@ void TMetadataRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _versio
NPrivate::Read<IncludeTopicAuthorizedOperationsMeta>(_readable, _version, IncludeTopicAuthorizedOperations);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -1946,10 +1946,10 @@ void TMetadataRequestData::TMetadataRequestTopic::Read(TKafkaReadable& _readable
NPrivate::Read<NameMeta>(_readable, _version, Name);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2012,10 +2012,10 @@ void TMetadataResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _versi
NPrivate::Read<ClusterAuthorizedOperationsMeta>(_readable, _version, ClusterAuthorizedOperations);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2084,10 +2084,10 @@ void TMetadataResponseData::TMetadataResponseBroker::Read(TKafkaReadable& _reada
NPrivate::Read<RackMeta>(_readable, _version, Rack);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2156,10 +2156,10 @@ void TMetadataResponseData::TMetadataResponseTopic::Read(TKafkaReadable& _readab
NPrivate::Read<TopicAuthorizedOperationsMeta>(_readable, _version, TopicAuthorizedOperations);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2231,10 +2231,10 @@ void TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::
NPrivate::Read<OfflineReplicasMeta>(_readable, _version, OfflineReplicas);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2296,10 +2296,10 @@ void TSaslHandshakeRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _v
NPrivate::Read<MechanismMeta>(_readable, _version, Mechanism);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2350,10 +2350,10 @@ void TSaslHandshakeResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _
NPrivate::Read<MechanismsMeta>(_readable, _version, Mechanisms);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2408,10 +2408,10 @@ void TApiVersionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _ver
NPrivate::Read<ClientSoftwareVersionMeta>(_readable, _version, ClientSoftwareVersion);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2475,10 +2475,10 @@ void TApiVersionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _ve
NPrivate::Read<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
case SupportedFeaturesMeta::Tag:
NPrivate::ReadTag<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures);
@@ -2562,10 +2562,10 @@ void TApiVersionsResponseData::TApiVersion::Read(TKafkaReadable& _readable, TKaf
NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2625,10 +2625,10 @@ void TApiVersionsResponseData::TSupportedFeatureKey::Read(TKafkaReadable& _reada
NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2688,10 +2688,10 @@ void TApiVersionsResponseData::TFinalizedFeatureKey::Read(TKafkaReadable& _reada
NPrivate::Read<MinVersionLevelMeta>(_readable, _version, MinVersionLevel);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2754,10 +2754,10 @@ void TInitProducerIdRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _
NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2822,10 +2822,10 @@ void TInitProducerIdResponseData::Read(TKafkaReadable& _readable, TKafkaVersion
NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2879,10 +2879,10 @@ void TSaslAuthenticateRequestData::Read(TKafkaReadable& _readable, TKafkaVersion
NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
@@ -2939,10 +2939,10 @@ void TSaslAuthenticateResponseData::Read(TKafkaReadable& _readable, TKafkaVersio
NPrivate::Read<SessionLifetimeMsMeta>(_readable, _version, SessionLifetimeMs);
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
- int _numTaggedFields = _readable.readUnsignedVarint();
- for (int _i = 0; _i < _numTaggedFields; ++_i) {
- int _tag = _readable.readUnsignedVarint();
- int _size = _readable.readUnsignedVarint();
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
switch (_tag) {
default:
_readable.skip(_size); // skip unknown tag
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp
index f592c8e4e4..9604a4d335 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp
@@ -19,20 +19,6 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
return *this;
}
-void TKafkaWritable::writeUnsignedVarint(TKafkaUint32 value) {
- while ((value & 0xffffff80) != 0L) {
- ui8 b = (ui8) ((value & 0x7f) | 0x80);
- write((const char*)&b, sizeof(b));
- value >>= 7;
- }
- ui8 b = (ui8) value;
- write((const char*)&b, sizeof(b));
-}
-
-void TKafkaWritable::writeVarint(TKafkaInt32 value) {
- writeUnsignedVarint((value << 1) ^ (value >> 31));
-}
-
void TKafkaWritable::write(const char* val, size_t length) {
Buffer.write(val, length);
}
@@ -67,28 +53,6 @@ TArrayRef<const char> TKafkaReadable::Bytes(size_t length) {
return r;
}
-ui32 TKafkaReadable::readUnsignedVarint() {
- ui32 value = 0;
- ui32 i = 0;
- ui16 b;
- while (((b = get()) & 0x80) != 0) {
-
- value |= ((ui32)(b & 0x7f)) << i;
- i += 7;
- if (i > 28) {
- ythrow yexception() << "illegal varint length";
- }
- }
-
- value |= b << i;
- return value;
-}
-
-i32 TKafkaReadable::readVarint() {
- ui32 v = readUnsignedVarint();
- return (v >> 1) ^ -(v & 1);
-}
-
void TKafkaReadable::skip(size_t length) {
checkEof(length);
Position += length;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 7461d0c78d..af8b608435 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -28,9 +28,9 @@ struct TSizeCollector {
};
-constexpr i32 SizeOfUnsignedVarint(i32 value) {
- int bytes = 1;
- while ((value & 0xffffff80) != 0L) {
+constexpr size_t SizeOfUnsignedVarint(ui64 value) {
+ size_t bytes = 1;
+ while ((value & 0xffffffffffffff80L) != 0L) {
bytes += 1;
value >>= 7;
}
@@ -106,7 +106,7 @@ inline void WriteStringSize(TKafkaWritable& writable, TKafkaVersion version, TKa
template<typename Meta>
inline TKafkaInt32 ReadStringSize(TKafkaReadable& readable, TKafkaVersion version) {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return readable.readUnsignedVarint() - 1;
+ return readable.readUnsignedVarint<TKafkaInt32>() - 1;
} else {
TKafkaInt16 v;
readable >> v;
@@ -128,9 +128,9 @@ inline void WriteArraySize(TKafkaWritable& writable, TKafkaVersion version, TKaf
template<typename Meta>
inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version) {
if constexpr (SizeFormat<Meta>() == Varint) {
- return readable.readVarint();
+ return readable.readVarint<TKafkaInt32>();
} else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return readable.readUnsignedVarint() - 1;
+ return readable.readUnsignedVarint<TKafkaInt32>() - 1;
} else {
TKafkaInt32 v;
readable >> v;
@@ -203,7 +203,7 @@ public:
inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TValueType& value) {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- value = readable.readVarint();
+ value = readable.readVarint<TValueType>();
} else {
readable >> value;
}
@@ -250,7 +250,7 @@ public:
inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TValueType& value) {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- value = readable.readUnsignedVarint() - 1;
+ value = readable.readUnsignedVarint<TValueType>() - 1;
} else {
readable >> value;
}
diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index b58b92995c..71a99af5b7 100644
--- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -176,20 +176,51 @@ Y_UNIT_TEST(ProduceRequest) {
UNIT_ASSERT_EQUAL(result.TopicData[1].PartitionData[0].Records, std::nullopt);
}
-Y_UNIT_TEST(UnsignedVarint) {
- std::vector<ui32> values = {0, 1, 127, 128, 32191};
-
- for(ui32 v : values) {
+template<class T>
+void CheckUnsignedVarint(const std::vector<T>& values) {
+ for(T v : values) {
+ Cerr << ">>>>> Check value=" << v << Endl << Flush;
TWritableBuf sb(nullptr, BUFFER_SIZE);
TKafkaWritable writable(sb);
TKafkaReadable readable(sb.GetBuffer());
writable.writeUnsignedVarint(v);
- ui32 r = readable.readUnsignedVarint();
- UNIT_ASSERT_EQUAL(r, v);
+ T r = readable.readUnsignedVarint<T>();
+ UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v);
}
}
+Y_UNIT_TEST(UnsignedVarint32) {
+ CheckUnsignedVarint<ui32>({0, 1, 127, 128, 32191, Max<i32>(), Max<ui32>()});
+}
+
+Y_UNIT_TEST(UnsignedVarint64) {
+ CheckUnsignedVarint<ui64>({0, 1, 127, 128, 32191, Max<i32>(), static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>(), Max<ui64>()});
+}
+
+template<class T>
+void CheckVarint(const std::vector<T>& values) {
+ for(T v : values) {
+ Cerr << ">>>>> Check value=" << v << Endl << Flush;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
+ TKafkaWritable writable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
+
+ writable.writeVarint(v);
+ T r = readable.readVarint<T>();
+
+ UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v);
+ }
+}
+
+Y_UNIT_TEST(Varint32) {
+ CheckVarint<i32>({ Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, Max<i32>()});
+}
+
+Y_UNIT_TEST(Varint64) {
+ CheckVarint<i64>({Min<i64>(), Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>()});
+}
+
#define SIMPLE_HEAD(Type_, Value) \
Meta_##Type_::Type value = Value; \
Meta_##Type_::Type result; \
@@ -251,10 +282,10 @@ Y_UNIT_TEST(TKafkaInt8_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint();
+ i32 tag = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaInt8::Tag);
- ui32 size = readable.readUnsignedVarint();
+ ui32 size = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(size, sizeof(TKafkaInt8));
NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result);
@@ -337,10 +368,10 @@ Y_UNIT_TEST(TKafkaString_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint();
+ i32 tag = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaString::Tag);
- ui32 size = readable.readUnsignedVarint();
+ ui32 size = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0
NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result);
@@ -398,10 +429,10 @@ Y_UNIT_TEST(TKafkaArray_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint();
+ i32 tag = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag);
- ui32 size = readable.readUnsignedVarint();
+ ui32 size = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(size, v.length() // array element data
+ NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size
+ NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length
@@ -464,10 +495,10 @@ Y_UNIT_TEST(TKafkaBytes_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint();
+ i32 tag = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag);
- ui32 size = readable.readUnsignedVarint();
+ ui32 size = readable.readUnsignedVarint<i32>();
UNIT_ASSERT_EQUAL(size, value->size() // byffer data
+ NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0
);