aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-19 10:06:34 +0300
committertesseract <tesseract@yandex-team.com>2023-09-19 10:20:57 +0300
commit5da445c080d4a526cdd2d0e825678cf7593ee03f (patch)
treeb8de633d191ab6b1a050b041f4fcc6192c73ed87
parent17a06defa7ddf179308a1b1820655a2e5f53f86b (diff)
downloadydb-5da445c080d4a526cdd2d0e825678cf7593ee03f.tar.gz
fix Varint length
-rw-r--r--ydb/core/kafka_proxy/kafka.h17
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h13
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp9
3 files changed, 30 insertions, 9 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 270d7df2e6..72febe4430 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -308,6 +308,13 @@ void NormalizeNumber(T& value) {
#endif
}
+template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>>
+U AsUnsigned(S value) {
+ static constexpr ui8 Shift = (sizeof(T) << 3) - 1;
+ return (value << 1) ^ (value >> Shift);
+}
+
+
class TKafkaWritable {
public:
TKafkaWritable(TWritableBuf& buffer)
@@ -340,9 +347,7 @@ public:
template<class T, typename U = std::make_unsigned_t<T>>
void writeVarint(T value) {
- static constexpr ui8 Shift = (sizeof(T) << 3) - 1;
-
- writeUnsignedVarint<U>((value << 1) ^ (value >> Shift));
+ writeUnsignedVarint<U>(AsUnsigned<T>(value));
}
void write(const char* val, size_t length);
@@ -377,16 +382,16 @@ public:
U value = 0;
size_t i = 0;
- ui8 b;
+ U b;
while (((b = static_cast<ui8>(get())) & 0x80) != 0) {
if (i > MaxLength) {
ythrow yexception() << "illegal varint length";
}
- value |= ((U)(b & 0x7f)) << i;
+ value |= (b & 0x7f) << i;
i += 7;
}
- value |= ((U)b) << i;
+ value |= b << i;
return value;
}
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index af8b608435..0eb388ba52 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -27,16 +27,23 @@ struct TSizeCollector {
ui32 NumTaggedFields = 0;
};
+template<class T, typename U = std::make_unsigned_t<T>>
+size_t SizeOfUnsignedVarint(T v) {
+ static constexpr T Mask = Max<U>() - 0x7F;
-constexpr size_t SizeOfUnsignedVarint(ui64 value) {
+ U value = v;
size_t bytes = 1;
- while ((value & 0xffffffffffffff80L) != 0L) {
+ while ((value & Mask) != 0L) {
bytes += 1;
value >>= 7;
}
return bytes;
}
+template<class T>
+size_t SizeOfVarint(T value) {
+ return SizeOfUnsignedVarint(AsUnsigned<T>(value));
+}
template<TKafkaVersion min, TKafkaVersion max>
constexpr bool VersionAll() {
@@ -211,7 +218,7 @@ public:
inline static i64 DoSize(TKafkaVersion version, const TValueType& value) {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return SizeOfUnsignedVarint(value);
+ return SizeOfVarint(value);
} else {
return sizeof(TValueType);
}
diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index 71a99af5b7..486f07e844 100644
--- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -185,6 +185,11 @@ void CheckUnsignedVarint(const std::vector<T>& values) {
TKafkaReadable readable(sb.GetBuffer());
writable.writeUnsignedVarint(v);
+
+ UNIT_ASSERT_EQUAL_C(sb.Size(), NKafka::NPrivate::SizeOfUnsignedVarint<T>(v),
+ TStringBuilder() << "Size mismatch " << sb.Size() << " != " << NKafka::NPrivate::SizeOfUnsignedVarint<T>(v));
+
+
T r = readable.readUnsignedVarint<T>();
UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v);
}
@@ -207,6 +212,10 @@ void CheckVarint(const std::vector<T>& values) {
TKafkaReadable readable(sb.GetBuffer());
writable.writeVarint(v);
+
+ UNIT_ASSERT_EQUAL_C(sb.Size(), NKafka::NPrivate::SizeOfVarint<T>(v),
+ TStringBuilder() << "Size mismatch " << sb.Size() << " != " << NKafka::NPrivate::SizeOfVarint<T>(v));
+
T r = readable.readVarint<T>();
UNIT_ASSERT_EQUAL_C(r, v, TStringBuilder() << r << " != " << v);