diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-21 08:48:27 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-21 09:08:24 +0300 |
commit | 188f3e9e286e518a802c4da4122d4806b800cd6c (patch) | |
tree | a8572637e5b6bcc51d7661a2314e7131ce42cc56 | |
parent | 504aa21c0d64dde3b5c5827174f65d9d5a4c10c9 (diff) | |
download | ydb-188f3e9e286e518a802c4da4122d4806b800cd6c.tar.gz |
Cherry-picks Kafka commits from 23-3-12
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 14 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.h | 35 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_serialization.cpp | 53 |
4 files changed, 70 insertions, 34 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index 72febe4430..bbd8c0be2c 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -333,7 +333,7 @@ public: template<class T, typename U = std::make_unsigned_t<T>> void writeUnsignedVarint(T v) { - static constexpr T Mask = Max<T>() - 0x7F; + static constexpr U Mask = Max<U>() - 0x7F; U value = v; while ((value & Mask) != 0L) { @@ -345,7 +345,7 @@ public: write((const char*)&b, sizeof(b)); } - template<class T, typename U = std::make_unsigned_t<T>> + template<std::signed_integral T, typename U = std::make_unsigned_t<T>> void writeVarint(T value) { writeUnsignedVarint<U>(AsUnsigned<T>(value)); } @@ -376,15 +376,15 @@ public: void read(char* val, size_t length); char get(); - template<class T, typename U = std::make_unsigned_t<T>> - T readUnsignedVarint() { - static constexpr size_t MaxLength = (sizeof(T) << 3) - 4; + template<std::unsigned_integral U> + U readUnsignedVarint() { + static constexpr size_t MaxLength = (sizeof(U) * 8 - 1) / 7 * 7; U value = 0; size_t i = 0; U b; while (((b = static_cast<ui8>(get())) & 0x80) != 0) { - if (i > MaxLength) { + if (i >= MaxLength) { ythrow yexception() << "illegal varint length"; } value |= (b & 0x7f) << i; @@ -395,7 +395,7 @@ public: return value; } - template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>> + template<std::signed_integral S, typename U = std::make_unsigned_t<S>> S readVarint() { U v = readUnsignedVarint<U>(); return (v >> 1) ^ -static_cast<S>(v & 1); diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index c381be6e45..6aaba8d911 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -49,10 +49,10 @@ public: static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); TEvPollerReady* InactivityEvent = nullptr; - TPollerToken::TPtr PollerToken; TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; + TPollerToken::TPtr PollerToken; TBufferedWriter Buffer; THPTimer InactivityTimer; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 36c963221d..b33ec1fd61 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -29,7 +29,7 @@ struct TSizeCollector { template<class T, typename U = std::make_unsigned_t<T>> size_t SizeOfUnsignedVarint(T v) { - static constexpr T Mask = Max<U>() - 0x7F; + static constexpr U Mask = Max<U>() - 0x7F; U value = v; size_t bytes = 1; @@ -113,7 +113,7 @@ inline void WriteStringSize(TKafkaWritable& writable, TKafkaVersion version, TKa template<typename Meta> inline TKafkaInt32 ReadStringSize(TKafkaReadable& readable, TKafkaVersion version) { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return readable.readUnsignedVarint<TKafkaInt32>() - 1; + return readable.readUnsignedVarint<ui32>() - 1; } else { TKafkaInt16 v; readable >> v; @@ -137,7 +137,7 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version if constexpr (SizeFormat<Meta>() == Varint) { return readable.readVarint<TKafkaInt32>(); } else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return readable.readUnsignedVarint<TKafkaInt32>() - 1; + return readable.readUnsignedVarint<ui32>() - 1; } else { TKafkaInt32 v; readable >> v; @@ -145,6 +145,17 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version } } +template<typename Meta> +inline TKafkaInt32 ArraySize(TKafkaVersion version, TKafkaInt32 size) { + if constexpr (SizeFormat<Meta>() == Varint) { + return SizeOfVarint(size); + } else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { + return SizeOfUnsignedVarint(size + 1); + } else { + return sizeof(TKafkaInt32); + } +} + inline IOutputStream& operator <<(IOutputStream& out, const TKafkaUuid& /*value*/) { return out << "---"; @@ -426,11 +437,7 @@ public: inline static i64 DoSize(TKafkaVersion version, const TKafkaBytes& value) { if (value) { const auto& v = *value; - if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return v.size() + SizeOfUnsignedVarint(v.size() + 1); - } else { - return v.size() + sizeof(TKafkaInt32); - } + return v.size() + ArraySize<Meta>(version, v.size()); } else { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { return 1; @@ -501,11 +508,7 @@ public: if (value) { const auto& v = *value; const auto size = v.Size(CURRENT_RECORD_VERSION); - if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return size + SizeOfUnsignedVarint(size + 1); - } else { - return size + sizeof(TKafkaInt32); - } + return size + ArraySize<Meta>(version, size); } else { if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { return 1; @@ -573,11 +576,7 @@ public: size += ItemStrategy::DoSize(version, v); } } - if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return size + SizeOfUnsignedVarint(value.size() + 1); - } else { - return size + sizeof(TKafkaInt32); - } + return size + ArraySize<Meta>(version, value.size()); } inline static void DoLog(const std::vector<TValueType>& value) { diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp index 49eaf479e0..c45793fd30 100644 --- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp +++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp @@ -244,6 +244,43 @@ Y_UNIT_TEST(Varint64) { CheckVarint<i64>({Min<i64>(), Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>()}); } +template<class T> +void CheckVarint_WrongBytes(std::vector<ui8> bytes) { + TWritableBuf sb(nullptr, BUFFER_SIZE); + TKafkaWritable writable(sb); + TKafkaReadable readable(sb.GetBuffer()); + + writable.write((char*)bytes.data(), bytes.size()); + + try { + readable.readUnsignedVarint<T>(); + UNIT_FAIL("Must be exception"); + } catch(const yexception& e) { + UNIT_ASSERT_STRING_CONTAINS(e.what(), "illegal varint length"); + } +} + +Y_UNIT_TEST(UnsignedVarint32_Wrong) { + CheckVarint_WrongBytes<ui32>({0xFF, 0xFF, 0xFF, 0xFF, 0xFF}); +} + +Y_UNIT_TEST(UnsignedVarint64_Wrong) { + CheckVarint_WrongBytes<ui64>({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}); +} + +Y_UNIT_TEST(UnsignedVarint32_Deserialize) { + std::vector<ui8> bytes = {0x81, 0x83, 0x05}; + + TWritableBuf sb(nullptr, BUFFER_SIZE); + TKafkaWritable writable(sb); + TKafkaReadable readable(sb.GetBuffer()); + + writable.write((char*)bytes.data(), bytes.size()); + + ui32 result = readable.readUnsignedVarint<ui32>(); + UNIT_ASSERT_EQUAL(result, 1 + (3 << 7) + (5 << 14)); +} + #define SIMPLE_HEAD(Type_, Value) \ Meta_##Type_::Type value = Value; \ Meta_##Type_::Type result; \ @@ -305,10 +342,10 @@ Y_UNIT_TEST(TKafkaInt8_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value); - i32 tag = readable.readUnsignedVarint<i32>(); + ui32 tag = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaInt8::Tag); - ui32 size = readable.readUnsignedVarint<i32>(); + ui32 size = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(size, sizeof(TKafkaInt8)); NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result); @@ -391,10 +428,10 @@ Y_UNIT_TEST(TKafkaString_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value); - i32 tag = readable.readUnsignedVarint<i32>(); + ui32 tag = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaString::Tag); - ui32 size = readable.readUnsignedVarint<i32>(); + ui32 size = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0 NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result); @@ -452,10 +489,10 @@ Y_UNIT_TEST(TKafkaArray_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value); - i32 tag = readable.readUnsignedVarint<i32>(); + ui32 tag = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag); - ui32 size = readable.readUnsignedVarint<i32>(); + ui32 size = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(size, v.length() // array element data + NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size + NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length @@ -518,10 +555,10 @@ Y_UNIT_TEST(TKafkaBytes_PresentVersion_TaggedVersion) { NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value); - i32 tag = readable.readUnsignedVarint<i32>(); + ui32 tag = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag); - ui32 size = readable.readUnsignedVarint<i32>(); + ui32 size = readable.readUnsignedVarint<ui32>(); UNIT_ASSERT_EQUAL(size, value->size() // byffer data + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0 ); |