diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-18 21:35:29 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-18 22:01:34 +0300 |
commit | c8409f20e76dba22a1e9928a4bdaa9595cf91e46 (patch) | |
tree | 979f98a82d9af1e0cd9dc4b4b74b0092d0b68ad4 | |
parent | 37b099086946de1c276f0c4d9e2f102202e1d999 (diff) | |
download | ydb-c8409f20e76dba22a1e9928a4bdaa9595cf91e46.tar.gz |
fix varint serialization
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 51 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.cpp | 344 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.h | 16 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_serialization.cpp | 59 |
5 files changed, 272 insertions, 234 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index d085bb2d25..270d7df2e6 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -324,8 +324,27 @@ public: TKafkaWritable& operator<<(const TKafkaRawBytes& val); TKafkaWritable& operator<<(const TKafkaRawString& val); - void writeUnsignedVarint(TKafkaUint32 val); - void writeVarint(TKafkaInt32 val); + template<class T, typename U = std::make_unsigned_t<T>> + void writeUnsignedVarint(T v) { + static constexpr T Mask = Max<T>() - 0x7F; + + U value = v; + while ((value & Mask) != 0L) { + ui8 b = (ui8) ((value & 0x7f) | 0x80); + write((const char*)&b, sizeof(b)); + value >>= 7; + } + ui8 b = (ui8) value; + write((const char*)&b, sizeof(b)); + } + + template<class T, typename U = std::make_unsigned_t<T>> + void writeVarint(T value) { + static constexpr ui8 Shift = (sizeof(T) << 3) - 1; + + writeUnsignedVarint<U>((value << 1) ^ (value >> Shift)); + } + void write(const char* val, size_t length); private: @@ -351,8 +370,32 @@ public: void read(char* val, size_t length); char get(); - ui32 readUnsignedVarint(); - i32 readVarint(); + + template<class T, typename U = std::make_unsigned_t<T>> + T readUnsignedVarint() { + static constexpr size_t MaxLength = (sizeof(T) << 3) - 4; + + U value = 0; + size_t i = 0; + ui8 b; + while (((b = static_cast<ui8>(get())) & 0x80) != 0) { + if (i > MaxLength) { + ythrow yexception() << "illegal varint length"; + } + value |= ((U)(b & 0x7f)) << i; + i += 7; + } + + value |= ((U)b) << i; + return value; + } + + template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>> + S readVarint() { + U v = readUnsignedVarint<U>(); + return (v >> 1) ^ -static_cast<S>(v & 1); + } + TArrayRef<const char> Bytes(size_t length); // returns a character from the specified position. The current position does not change. diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 89b32e0efe..55b2ab1fa4 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -196,10 +196,10 @@ void TRequestHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version) NPrivate::Read<ClientIdMeta>(_readable, _version, ClientId); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -255,10 +255,10 @@ void TResponseHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -315,10 +315,10 @@ void TProduceRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version NPrivate::Read<TopicDataMeta>(_readable, _version, TopicData); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -375,10 +375,10 @@ void TProduceRequestData::TTopicProduceData::Read(TKafkaReadable& _readable, TKa NPrivate::Read<PartitionDataMeta>(_readable, _version, PartitionData); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -431,10 +431,10 @@ void TProduceRequestData::TTopicProduceData::TPartitionProduceData::Read(TKafkaR NPrivate::Read<RecordsMeta>(_readable, _version, Records); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -487,10 +487,10 @@ void TProduceResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _versio NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -543,10 +543,10 @@ void TProduceResponseData::TTopicProduceResponse::Read(TKafkaReadable& _readable NPrivate::Read<PartitionResponsesMeta>(_readable, _version, PartitionResponses); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -614,10 +614,10 @@ void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Rea NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -682,10 +682,10 @@ void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBa NPrivate::Read<BatchIndexErrorMessageMeta>(_readable, _version, BatchIndexErrorMessage); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -763,10 +763,10 @@ void TFetchRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) NPrivate::Read<RackIdMeta>(_readable, _version, RackId); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { case ClusterIdMeta::Tag: NPrivate::ReadTag<ClusterIdMeta>(_readable, _version, ClusterId); @@ -844,10 +844,10 @@ void TFetchRequestData::TFetchTopic::Read(TKafkaReadable& _readable, TKafkaVersi NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -916,10 +916,10 @@ void TFetchRequestData::TFetchTopic::TFetchPartition::Read(TKafkaReadable& _read NPrivate::Read<PartitionMaxBytesMeta>(_readable, _version, PartitionMaxBytes); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -983,10 +983,10 @@ void TFetchRequestData::TForgottenTopic::Read(TKafkaReadable& _readable, TKafkaV NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1047,10 +1047,10 @@ void TFetchResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) NPrivate::Read<ResponsesMeta>(_readable, _version, Responses); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1110,10 +1110,10 @@ void TFetchResponseData::TFetchableTopicResponse::Read(TKafkaReadable& _readable NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1187,10 +1187,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::Read(TKafkaRea NPrivate::Read<RecordsMeta>(_readable, _version, Records); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { case DivergingEpochMeta::Tag: NPrivate::ReadTag<DivergingEpochMeta>(_readable, _version, DivergingEpoch); @@ -1275,10 +1275,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffse NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1333,10 +1333,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEp NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1391,10 +1391,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::R NPrivate::Read<EpochMeta>(_readable, _version, Epoch); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1449,10 +1449,10 @@ void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransa NPrivate::Read<FirstOffsetMeta>(_readable, _version, FirstOffset); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1508,10 +1508,10 @@ void TListOffsetsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _ver NPrivate::Read<TopicsMeta>(_readable, _version, Topics); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1566,10 +1566,10 @@ void TListOffsetsRequestData::TListOffsetsTopic::Read(TKafkaReadable& _readable, NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1630,10 +1630,10 @@ void TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition::Read(TKa NPrivate::Read<MaxNumOffsetsMeta>(_readable, _version, MaxNumOffsets); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1690,10 +1690,10 @@ void TListOffsetsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _ve NPrivate::Read<TopicsMeta>(_readable, _version, Topics); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1746,10 +1746,10 @@ void TListOffsetsResponseData::TListOffsetsTopicResponse::Read(TKafkaReadable& _ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1814,10 +1814,10 @@ void TListOffsetsResponseData::TListOffsetsTopicResponse::TListOffsetsPartitionR NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1884,10 +1884,10 @@ void TMetadataRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _versio NPrivate::Read<IncludeTopicAuthorizedOperationsMeta>(_readable, _version, IncludeTopicAuthorizedOperations); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -1946,10 +1946,10 @@ void TMetadataRequestData::TMetadataRequestTopic::Read(TKafkaReadable& _readable NPrivate::Read<NameMeta>(_readable, _version, Name); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2012,10 +2012,10 @@ void TMetadataResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _versi NPrivate::Read<ClusterAuthorizedOperationsMeta>(_readable, _version, ClusterAuthorizedOperations); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2084,10 +2084,10 @@ void TMetadataResponseData::TMetadataResponseBroker::Read(TKafkaReadable& _reada NPrivate::Read<RackMeta>(_readable, _version, Rack); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2156,10 +2156,10 @@ void TMetadataResponseData::TMetadataResponseTopic::Read(TKafkaReadable& _readab NPrivate::Read<TopicAuthorizedOperationsMeta>(_readable, _version, TopicAuthorizedOperations); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2231,10 +2231,10 @@ void TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition:: NPrivate::Read<OfflineReplicasMeta>(_readable, _version, OfflineReplicas); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2296,10 +2296,10 @@ void TSaslHandshakeRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _v NPrivate::Read<MechanismMeta>(_readable, _version, Mechanism); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2350,10 +2350,10 @@ void TSaslHandshakeResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _ NPrivate::Read<MechanismsMeta>(_readable, _version, Mechanisms); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2408,10 +2408,10 @@ void TApiVersionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _ver NPrivate::Read<ClientSoftwareVersionMeta>(_readable, _version, ClientSoftwareVersion); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2475,10 +2475,10 @@ void TApiVersionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _ve NPrivate::Read<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { case SupportedFeaturesMeta::Tag: NPrivate::ReadTag<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures); @@ -2562,10 +2562,10 @@ void TApiVersionsResponseData::TApiVersion::Read(TKafkaReadable& _readable, TKaf NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2625,10 +2625,10 @@ void TApiVersionsResponseData::TSupportedFeatureKey::Read(TKafkaReadable& _reada NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2688,10 +2688,10 @@ void TApiVersionsResponseData::TFinalizedFeatureKey::Read(TKafkaReadable& _reada NPrivate::Read<MinVersionLevelMeta>(_readable, _version, MinVersionLevel); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2754,10 +2754,10 @@ void TInitProducerIdRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _ NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2822,10 +2822,10 @@ void TInitProducerIdResponseData::Read(TKafkaReadable& _readable, TKafkaVersion NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2879,10 +2879,10 @@ void TSaslAuthenticateRequestData::Read(TKafkaReadable& _readable, TKafkaVersion NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag @@ -2939,10 +2939,10 @@ void TSaslAuthenticateResponseData::Read(TKafkaReadable& _readable, TKafkaVersio NPrivate::Read<SessionLifetimeMsMeta>(_readable, _version, SessionLifetimeMs); if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { - int _numTaggedFields = _readable.readUnsignedVarint(); - for (int _i = 0; _i < _numTaggedFields; ++_i) { - int _tag = _readable.readUnsignedVarint(); - int _size = _readable.readUnsignedVarint(); + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); switch (_tag) { default: _readable.skip(_size); // skip unknown tag diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp index f592c8e4e4..9604a4d335 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.cpp +++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp @@ -19,20 +19,6 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) { return *this; } -void TKafkaWritable::writeUnsignedVarint(TKafkaUint32 value) { - while ((value & 0xffffff80) != 0L) { - ui8 b = (ui8) ((value & 0x7f) | 0x80); - write((const char*)&b, sizeof(b)); - value >>= 7; - } - ui8 b = (ui8) value; - write((const char*)&b, sizeof(b)); -} - -void TKafkaWritable::writeVarint(TKafkaInt32 value) { - writeUnsignedVarint((value << 1) ^ (value >> 31)); -} - void TKafkaWritable::write(const char* val, size_t length) { Buffer.write(val, length); } @@ -67,28 +53,6 @@ TArrayRef<const char> TKafkaReadable::Bytes(size_t length) { return r; } -ui32 TKafkaReadable::readUnsignedVarint() { - ui32 value = 0; - ui32 i = 0; - ui16 b; - while (((b = get()) & 0x80) != 0) { - - value |= ((ui32)(b & 0x7f)) << i; - i += 7; - if (i > 28) { - ythrow yexception() << "illegal varint length"; - } - } - - value |= b << i; - return value; -} - -i32 TKafkaReadable::readVarint() { - ui32 v = readUnsignedVarint(); - return (v >> 1) ^ -(v & 1); -} - void TKafkaReadable::skip(size_t length) { checkEof(length); Position += length; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 7461d0c78d..af8b608435 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -28,9 +28,9 @@ struct TSizeCollector { }; -constexpr i32 SizeOfUnsignedVarint(i32 value) { - int bytes = 1; - while ((value & 0xffffff80) != 0L) { +constexpr size_t SizeOfUnsignedVarint(ui64 value) { + size_t bytes = 1; + while ((value & 0xffffffffffffff80L) != 0L) { bytes += 1; value >>= 7; } @@ -106,7 +106,7 @@ inline void WriteStringSize(TKafkaWritable& writable, TKafkaVersion version, TKa template<typename Meta> inline TKafkaInt32 ReadStringSize(TKafkaReadable& readable, TKafkaVersion version) { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return readable.readUnsignedVarint() - 1; + return readable.readUnsignedVarint<TKafkaInt32>() - 1; } else { TKafkaInt16 v; readable >> v; @@ -128,9 +128,9 @@ inline void WriteArraySize(TKafkaWritable& writable, TKafkaVersion version, TKaf template<typename Meta> inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version) { if constexpr (SizeFormat<Meta>() == Varint) { - return readable.readVarint(); + return readable.readVarint<TKafkaInt32>(); } else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return readable.readUnsignedVarint() - 1; + return readable.readUnsignedVarint<TKafkaInt32>() - 1; } else { TKafkaInt32 v; readable >> v; @@ -203,7 +203,7 @@ public: inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TValueType& value) { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - value = readable.readVarint(); + value = readable.readVarint<TValueType>(); } else { readable >> value; } @@ -250,7 +250,7 @@ public: inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TValueType& value) { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - value = readable.readUnsignedVarint() - 1; + value = readable.readUnsignedVarint<TValueType>() - 1; } else { readable >> value; } diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp index b58b92995c..71a99af5b7 100644 --- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp +++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp @@ -176,20 +176,51 @@ Y_UNIT_TEST(ProduceRequest) { UNIT_ASSERT_EQUAL(result.TopicData[1].PartitionData[0].Records, std::nullopt); } -Y_UNIT_TEST(UnsignedVarint) { - std::vector<ui32> values = {0, 1, 127, 128, 32191}; - - for(ui32 v : values) { +template<class T> +void CheckUnsignedVarint(const std::vector<T>& values) { + for(T v : values) { + Cerr << ">>>>> Check value=" << v << Endl << Flush; TWritableBuf sb(nullptr, BUFFER_SIZE); TKafkaWritable writable(sb); TKafkaReadable readable(sb.GetBuffer()); writable.writeUnsignedVarint(v); - ui32 r = readable.readUnsignedVarint(); - UNIT_ASSERT_EQUAL(r, v); + T r = readable.readUnsignedVarint<T>(); + UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v); } } +Y_UNIT_TEST(UnsignedVarint32) { + CheckUnsignedVarint<ui32>({0, 1, 127, 128, 32191, Max<i32>(), Max<ui32>()}); +} + +Y_UNIT_TEST(UnsignedVarint64) { + CheckUnsignedVarint<ui64>({0, 1, 127, 128, 32191, Max<i32>(), static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>(), Max<ui64>()}); +} + +template<class T> +void CheckVarint(const std::vector<T>& values) { + for(T v : values) { + Cerr << ">>>>> Check value=" << v << Endl << Flush; + TWritableBuf sb(nullptr, BUFFER_SIZE); + TKafkaWritable writable(sb); + TKafkaReadable readable(sb.GetBuffer()); + + writable.writeVarint(v); + T r = readable.readVarint<T>(); + + UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v); + } +} + +Y_UNIT_TEST(Varint32) { + CheckVarint<i32>({ Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, Max<i32>()}); +} + +Y_UNIT_TEST(Varint64) { + CheckVarint<i64>({Min<i64>(), Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>()}); +} + #define SIMPLE_HEAD(Type_, Value) \ Meta_##Type_::Type value = Value; \ Meta_##Type_::Type result; \ @@ -251,10 +282,10 @@ Y_UNIT_TEST(TKafkaInt8_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value); - i32 tag = readable.readUnsignedVarint(); + i32 tag = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaInt8::Tag); - ui32 size = readable.readUnsignedVarint(); + ui32 size = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(size, sizeof(TKafkaInt8)); NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result); @@ -337,10 +368,10 @@ Y_UNIT_TEST(TKafkaString_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value); - i32 tag = readable.readUnsignedVarint(); + i32 tag = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaString::Tag); - ui32 size = readable.readUnsignedVarint(); + ui32 size = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0 NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result); @@ -398,10 +429,10 @@ Y_UNIT_TEST(TKafkaArray_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value); - i32 tag = readable.readUnsignedVarint(); + i32 tag = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag); - ui32 size = readable.readUnsignedVarint(); + ui32 size = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(size, v.length() // array element data + NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size + NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length @@ -464,10 +495,10 @@ Y_UNIT_TEST(TKafkaBytes_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value); - i32 tag = readable.readUnsignedVarint(); + i32 tag = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag); - ui32 size = readable.readUnsignedVarint(); + ui32 size = readable.readUnsignedVarint<i32>(); UNIT_ASSERT_EQUAL(size, value->size() // byffer data + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0 ); |