aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-21 08:48:27 +0300
committertesseract <tesseract@yandex-team.com>2023-09-21 09:08:24 +0300
commit188f3e9e286e518a802c4da4122d4806b800cd6c (patch)
treea8572637e5b6bcc51d7661a2314e7131ce42cc56
parent504aa21c0d64dde3b5c5827174f65d9d5a4c10c9 (diff)
downloadydb-188f3e9e286e518a802c4da4122d4806b800cd6c.tar.gz
Cherry-picks Kafka commits from 23-3-12
-rw-r--r--ydb/core/kafka_proxy/kafka.h14
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp2
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h35
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp53
4 files changed, 70 insertions, 34 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 72febe4430..bbd8c0be2c 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -333,7 +333,7 @@ public:
template<class T, typename U = std::make_unsigned_t<T>>
void writeUnsignedVarint(T v) {
- static constexpr T Mask = Max<T>() - 0x7F;
+ static constexpr U Mask = Max<U>() - 0x7F;
U value = v;
while ((value & Mask) != 0L) {
@@ -345,7 +345,7 @@ public:
write((const char*)&b, sizeof(b));
}
- template<class T, typename U = std::make_unsigned_t<T>>
+ template<std::signed_integral T, typename U = std::make_unsigned_t<T>>
void writeVarint(T value) {
writeUnsignedVarint<U>(AsUnsigned<T>(value));
}
@@ -376,15 +376,15 @@ public:
void read(char* val, size_t length);
char get();
- template<class T, typename U = std::make_unsigned_t<T>>
- T readUnsignedVarint() {
- static constexpr size_t MaxLength = (sizeof(T) << 3) - 4;
+ template<std::unsigned_integral U>
+ U readUnsignedVarint() {
+ static constexpr size_t MaxLength = (sizeof(U) * 8 - 1) / 7 * 7;
U value = 0;
size_t i = 0;
U b;
while (((b = static_cast<ui8>(get())) & 0x80) != 0) {
- if (i > MaxLength) {
+ if (i >= MaxLength) {
ythrow yexception() << "illegal varint length";
}
value |= (b & 0x7f) << i;
@@ -395,7 +395,7 @@ public:
return value;
}
- template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>>
+ template<std::signed_integral S, typename U = std::make_unsigned_t<S>>
S readVarint() {
U v = readUnsignedVarint<U>();
return (v >> 1) ^ -static_cast<S>(v & 1);
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index c381be6e45..6aaba8d911 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -49,10 +49,10 @@ public:
static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
TEvPollerReady* InactivityEvent = nullptr;
- TPollerToken::TPtr PollerToken;
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
+ TPollerToken::TPtr PollerToken;
TBufferedWriter Buffer;
THPTimer InactivityTimer;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 36c963221d..b33ec1fd61 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -29,7 +29,7 @@ struct TSizeCollector {
template<class T, typename U = std::make_unsigned_t<T>>
size_t SizeOfUnsignedVarint(T v) {
- static constexpr T Mask = Max<U>() - 0x7F;
+ static constexpr U Mask = Max<U>() - 0x7F;
U value = v;
size_t bytes = 1;
@@ -113,7 +113,7 @@ inline void WriteStringSize(TKafkaWritable& writable, TKafkaVersion version, TKa
template<typename Meta>
inline TKafkaInt32 ReadStringSize(TKafkaReadable& readable, TKafkaVersion version) {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return readable.readUnsignedVarint<TKafkaInt32>() - 1;
+ return readable.readUnsignedVarint<ui32>() - 1;
} else {
TKafkaInt16 v;
readable >> v;
@@ -137,7 +137,7 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version
if constexpr (SizeFormat<Meta>() == Varint) {
return readable.readVarint<TKafkaInt32>();
} else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return readable.readUnsignedVarint<TKafkaInt32>() - 1;
+ return readable.readUnsignedVarint<ui32>() - 1;
} else {
TKafkaInt32 v;
readable >> v;
@@ -145,6 +145,17 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version
}
}
+template<typename Meta>
+inline TKafkaInt32 ArraySize(TKafkaVersion version, TKafkaInt32 size) {
+ if constexpr (SizeFormat<Meta>() == Varint) {
+ return SizeOfVarint(size);
+ } else if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
+ return SizeOfUnsignedVarint(size + 1);
+ } else {
+ return sizeof(TKafkaInt32);
+ }
+}
+
inline IOutputStream& operator <<(IOutputStream& out, const TKafkaUuid& /*value*/) {
return out << "---";
@@ -426,11 +437,7 @@ public:
inline static i64 DoSize(TKafkaVersion version, const TKafkaBytes& value) {
if (value) {
const auto& v = *value;
- if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return v.size() + SizeOfUnsignedVarint(v.size() + 1);
- } else {
- return v.size() + sizeof(TKafkaInt32);
- }
+ return v.size() + ArraySize<Meta>(version, v.size());
} else {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
return 1;
@@ -501,11 +508,7 @@ public:
if (value) {
const auto& v = *value;
const auto size = v.Size(CURRENT_RECORD_VERSION);
- if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return size + SizeOfUnsignedVarint(size + 1);
- } else {
- return size + sizeof(TKafkaInt32);
- }
+ return size + ArraySize<Meta>(version, size);
} else {
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
return 1;
@@ -573,11 +576,7 @@ public:
size += ItemStrategy::DoSize(version, v);
}
}
- if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return size + SizeOfUnsignedVarint(value.size() + 1);
- } else {
- return size + sizeof(TKafkaInt32);
- }
+ return size + ArraySize<Meta>(version, value.size());
}
inline static void DoLog(const std::vector<TValueType>& value) {
diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index 49eaf479e0..c45793fd30 100644
--- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -244,6 +244,43 @@ Y_UNIT_TEST(Varint64) {
CheckVarint<i64>({Min<i64>(), Min<i32>(), -167966, -1, 0, 1, 127, 128, 32191, static_cast<unsigned long>(Max<i32>()) + 1, Max<i64>()});
}
+template<class T>
+void CheckVarint_WrongBytes(std::vector<ui8> bytes) {
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
+ TKafkaWritable writable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
+
+ writable.write((char*)bytes.data(), bytes.size());
+
+ try {
+ readable.readUnsignedVarint<T>();
+ UNIT_FAIL("Must be exception");
+ } catch(const yexception& e) {
+ UNIT_ASSERT_STRING_CONTAINS(e.what(), "illegal varint length");
+ }
+}
+
+Y_UNIT_TEST(UnsignedVarint32_Wrong) {
+ CheckVarint_WrongBytes<ui32>({0xFF, 0xFF, 0xFF, 0xFF, 0xFF});
+}
+
+Y_UNIT_TEST(UnsignedVarint64_Wrong) {
+ CheckVarint_WrongBytes<ui64>({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF});
+}
+
+Y_UNIT_TEST(UnsignedVarint32_Deserialize) {
+ std::vector<ui8> bytes = {0x81, 0x83, 0x05};
+
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
+ TKafkaWritable writable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
+
+ writable.write((char*)bytes.data(), bytes.size());
+
+ ui32 result = readable.readUnsignedVarint<ui32>();
+ UNIT_ASSERT_EQUAL(result, 1 + (3 << 7) + (5 << 14));
+}
+
#define SIMPLE_HEAD(Type_, Value) \
Meta_##Type_::Type value = Value; \
Meta_##Type_::Type result; \
@@ -305,10 +342,10 @@ Y_UNIT_TEST(TKafkaInt8_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint<i32>();
+ ui32 tag = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaInt8::Tag);
- ui32 size = readable.readUnsignedVarint<i32>();
+ ui32 size = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(size, sizeof(TKafkaInt8));
NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result);
@@ -391,10 +428,10 @@ Y_UNIT_TEST(TKafkaString_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint<i32>();
+ ui32 tag = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaString::Tag);
- ui32 size = readable.readUnsignedVarint<i32>();
+ ui32 size = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0
NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result);
@@ -452,10 +489,10 @@ Y_UNIT_TEST(TKafkaArray_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint<i32>();
+ ui32 tag = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag);
- ui32 size = readable.readUnsignedVarint<i32>();
+ ui32 size = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(size, v.length() // array element data
+ NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size
+ NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length
@@ -518,10 +555,10 @@ Y_UNIT_TEST(TKafkaBytes_PresentVersion_TaggedVersion) {
NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value);
- i32 tag = readable.readUnsignedVarint<i32>();
+ ui32 tag = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(tag, Meta_TKafkaArray::Tag);
- ui32 size = readable.readUnsignedVarint<i32>();
+ ui32 size = readable.readUnsignedVarint<ui32>();
UNIT_ASSERT_EQUAL(size, value->size() // byffer data
+ NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0
);