aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-15 17:37:09 +0300
committertesseract <tesseract@yandex-team.com>2023-08-15 18:37:39 +0300
commit364dc0cbc026def771916f339d77b5582a01de6f (patch)
tree7540da78c1fdff3e771a4fd3de07eec8958d0bc9
parentb011a40f88775f78a43c514953261f96b0d7be43 (diff)
downloadydb-364dc0cbc026def771916f339d77b5582a01de6f.tar.gz
Support Kafka Record format V0 and V1 (produce only)
-rw-r--r--ydb/core/kafka_proxy/kafka.h3
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp2
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.cpp5
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h31
-rw-r--r--ydb/core/kafka_proxy/kafka_records.cpp90
-rw-r--r--ydb/core/kafka_proxy/kafka_records.h172
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp44
7 files changed, 338 insertions, 9 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index df14f752d8..d085bb2d25 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -355,6 +355,9 @@ public:
i32 readVarint();
TArrayRef<const char> Bytes(size_t length);
+ // returns a character from the specified position. The current position does not change.
+ char take(size_t shift);
+
void skip(size_t length);
private:
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 2ff392fa41..92ddbb5e20 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -392,7 +392,7 @@ protected:
case SIZE_PREPARE:
NormalizeNumber(Request->ExpectedSize);
if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) {
- KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize);
+ KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: " << Context->Config.GetMaxMessageSize());
return PassAway();
}
Step = INFLIGTH_CHECK;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp
index 250924815b..f592c8e4e4 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp
@@ -94,6 +94,11 @@ void TKafkaReadable::skip(size_t length) {
Position += length;
}
+char TKafkaReadable::take(size_t shift) {
+ checkEof(shift + sizeof(char));
+ return *(Is.Data() + Position + shift);
+}
+
void TKafkaReadable::checkEof(size_t length) {
if (Position + length > Is.Size()) {
ythrow yexception() << "unexpected end of stream";
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 95172b8228..890b19ede5 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -17,7 +17,6 @@ namespace NKafka {
namespace NPrivate {
static constexpr bool DEBUG_ENABLED = false;
-static constexpr TKafkaInt32 MAX_RECORDS_SIZE = 1 << 28; // 256Mb
struct TWriteCollector {
ui32 NumTaggedFields = 0;
@@ -447,11 +446,12 @@ public:
//
template<typename Meta>
class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
+ static constexpr TKafkaVersion CURRENT_RECORD_VERSION = 2;
public:
inline static void DoWrite(TKafkaWritable& writable, TKafkaVersion version, const TKafkaRecords& value) {
if (value) {
- WriteArraySize<Meta>(writable, version, DoSize(version, value));
- (*value).Write(writable, version);
+ WriteArraySize<Meta>(writable, version, DoSize(CURRENT_RECORD_VERSION, value));
+ (*value).Write(writable, CURRENT_RECORD_VERSION);
} else {
WriteArraySize<Meta>(writable, version, 0);
}
@@ -464,16 +464,35 @@ public:
inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TKafkaRecords& value) {
int length = ReadArraySize<Meta>(readable, version);
if (length > 0) {
+ char magic = readable.take(16);
value.emplace();
- (*value).Read(readable, version);
+
+ if (magic < CURRENT_RECORD_VERSION) {
+ TKafkaRecordBatchV0 v0;
+ v0.Read(readable, magic);
+
+ value->Magic = v0.Record.Magic;
+ value->Crc = v0.Record.Crc;
+ value->Attributes = v0.Record.Attributes & 0x07;
+
+ value->Records.resize(1);
+ auto& record = value->Records.front();
+ record.Length = v0.Record.MessageSize;
+ record.OffsetDelta = v0.Offset;
+ record.TimestampDelta = v0.Record.Timestamp;
+ record.Key = v0.Record.Key;
+ record.Value = v0.Record.Value;
+ } else {
+ (*value).Read(readable, magic);
+ }
} else {
value = std::nullopt;
}
}
- inline static i64 DoSize(TKafkaVersion version, const TKafkaRecords& value) {
+ inline static i64 DoSize(TKafkaVersion /*version*/, const TKafkaRecords& value) {
if (value) {
- return (*value).Size(version);
+ return (*value).Size(CURRENT_RECORD_VERSION);
} else {
return 0;
}
diff --git a/ydb/core/kafka_proxy/kafka_records.cpp b/ydb/core/kafka_proxy/kafka_records.cpp
index 5bb6a7fc30..13b4efdea9 100644
--- a/ydb/core/kafka_proxy/kafka_records.cpp
+++ b/ydb/core/kafka_proxy/kafka_records.cpp
@@ -196,4 +196,94 @@ i32 TKafkaRecordBatch::Size(TKafkaVersion _version) const {
return _collector.Size;
}
+
+
+//
+// TKafkaRecordV0
+//
+const TKafkaRecordV0::KeyMeta::Type TKafkaRecordV0::KeyMeta::Default = std::nullopt;
+
+TKafkaRecordV0::TKafkaRecordV0()
+ : MessageSize(MessageSizeMeta::Default)
+ , Crc(CrcMeta::Default)
+ , Magic(MagicMeta::Default)
+ , Attributes(AttributesMeta::Default)
+ , Timestamp(TimestampMeta::Default)
+ , Key(KeyMeta::Default) {
+}
+
+void TKafkaRecordV0::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TKafkaRecordV0";
+ }
+ NPrivate::Read<MessageSizeMeta>(_readable, _version, MessageSize);
+ NPrivate::Read<CrcMeta>(_readable, _version, Crc);
+ NPrivate::Read<MagicMeta>(_readable, _version, Magic);
+ NPrivate::Read<AttributesMeta>(_readable, _version, Attributes);
+ NPrivate::Read<TimestampMeta>(_readable, _version, Timestamp);
+ NPrivate::Read<KeyMeta>(_readable, _version, Key);
+ NPrivate::Read<ValueMeta>(_readable, _version, Value);
+}
+
+void TKafkaRecordV0::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TKafkaRecordV0";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MessageSizeMeta>(_collector, _writable, _version, MessageSize);
+ NPrivate::Write<CrcMeta>(_collector, _writable, _version, Crc);
+ NPrivate::Write<MagicMeta>(_collector, _writable, _version, Magic);
+ NPrivate::Write<AttributesMeta>(_collector, _writable, _version, Attributes);
+ NPrivate::Write<TimestampMeta>(_collector, _writable, _version, Timestamp);
+ NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key);
+ NPrivate::Write<ValueMeta>(_collector, _writable, _version, Value);
+}
+
+i32 TKafkaRecordV0::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MessageSizeMeta>(_collector, _version, MessageSize);
+ NPrivate::Size<CrcMeta>(_collector, _version, Crc);
+ NPrivate::Size<MagicMeta>(_collector, _version, Magic);
+ NPrivate::Size<AttributesMeta>(_collector, _version, Attributes);
+ NPrivate::Size<TimestampMeta>(_collector, _version, Timestamp);
+ NPrivate::Size<KeyMeta>(_collector, _version, Key);
+ NPrivate::Size<ValueMeta>(_collector, _version, Value);
+
+ return _collector.Size;
+}
+
+
+
+//
+// TKafkaRecordV0
+//
+TKafkaRecordBatchV0::TKafkaRecordBatchV0()
+ : Offset(OffsetMeta::Default) {
+}
+
+void TKafkaRecordBatchV0::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TKafkaRecordBatchV0";
+ }
+ NPrivate::Read<OffsetMeta>(_readable, _version, Offset);
+ NPrivate::Read<RecordMeta>(_readable, _version, Record);
+}
+
+void TKafkaRecordBatchV0::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TKafkaRecordBatchV0";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<OffsetMeta>(_collector, _writable, _version, Offset);
+ NPrivate::Write<RecordMeta>(_collector, _writable, _version, Record);
+}
+
+i32 TKafkaRecordBatchV0::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<OffsetMeta>(_collector, _version, Offset);
+ NPrivate::Size<RecordMeta>(_collector, _version, Record);
+
+ return _collector.Size;
+}
+
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_records.h b/ydb/core/kafka_proxy/kafka_records.h
index fe8540f7ae..2a8e5b3aa5 100644
--- a/ydb/core/kafka_proxy/kafka_records.h
+++ b/ydb/core/kafka_proxy/kafka_records.h
@@ -70,7 +70,7 @@ public:
class TKafkaRecord: public TMessage {
public:
struct MessageMeta {
- static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions PresentVersions = TKafkaVersions(2, 2);
static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
};
@@ -197,7 +197,7 @@ public:
class TKafkaRecordBatch: public TMessage {
public:
struct MessageMeta {
- static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions PresentVersions = TKafkaVersions(2, 2);
static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
};
@@ -413,4 +413,172 @@ public:
bool HasDeleteHorizonMs();
};
+
+class TKafkaRecordV0: public TMessage {
+public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = TKafkaVersions(0, 1);
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TKafkaRecordV0();
+ ~TKafkaRecordV0() = default;
+
+ struct MessageSizeMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "messageSize";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ MessageSizeMeta::Type MessageSize;
+
+ struct CrcMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "CRC";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ CrcMeta::Type Crc;
+
+ struct MagicMeta {
+ using Type = TKafkaInt8;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "magic";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ MagicMeta::Type Magic;
+
+ struct AttributesMeta {
+ using Type = TKafkaInt8;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "attributes";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ AttributesMeta::Type Attributes;
+
+ struct TimestampMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "timestamp";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = TKafkaVersions(1, Max<TKafkaVersion>());
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ TimestampMeta::Type Timestamp;
+
+ struct KeyMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "key";
+ static constexpr const char* About = "";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ KeyMeta::Type Key;
+
+ struct ValueMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "value";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ ValueMeta::Type Value;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TKafkaRecordV0& other) const = default;
+};
+
+class TKafkaRecordBatchV0: public TMessage {
+public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = TKafkaVersions(0, 1);
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TKafkaRecordBatchV0();
+ ~TKafkaRecordBatchV0() = default;
+
+ struct OffsetMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "offset";
+ static constexpr const char* About = "";
+ static constexpr Type Default = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ OffsetMeta::Type Offset;
+
+ struct RecordMeta {
+ using Type = TKafkaRecordV0;
+ using TypeDesc = NPrivate::TKafkaStructDesc;
+
+ static constexpr const char* Name = "records";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ RecordMeta::Type Record;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TKafkaRecordBatchV0& other) const = default;
+};
+
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index 5d492d5740..b58b92995c 100644
--- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -658,6 +658,50 @@ Y_UNIT_TEST(ProduceRequestData) {
}
}
+Y_UNIT_TEST(ProduceRequestData_Record_v0) {
+ ui8 reference[] = {0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x05, 0x00, 0x07, 0x72, 0x64, 0x6B, 0x61, 0x66, 0x6B,
+ 0x61, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x75, 0x30, 0x00, 0x00, 0x00, 0x01, 0x00, 0x12, 0x2F,
+ 0x52, 0x6F, 0x6F, 0x74, 0x2F, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74, 0x6F, 0x70, 0x69, 0x63, 0x2D,
+ 0x31, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x1A, 0x00, 0x00, 0x00, 0x2B, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1F, 0x20, 0x6F, 0x55, 0x26, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x05, 0x6B, 0x65, 0x79, 0x2D, 0x31, 0x00, 0x00, 0x00, 0x0C, 0x74, 0x65, 0x73, 0x74,
+ 0x20, 0x6D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65};
+
+ TBuffer buffer((char*)reference, sizeof(reference));
+ TKafkaReadable readable(buffer);
+
+ Cerr << ">>>>> Buffer size: " << buffer.Size() << Endl;
+
+ TRequestHeaderData header;
+ header.Read(readable, 1);
+
+ TProduceRequestData result;
+ result.Read(readable, header.RequestApiVersion);
+
+ UNIT_ASSERT_EQUAL(result.Acks, -1);
+ UNIT_ASSERT_EQUAL(result.TimeoutMs, 30000);
+
+ auto& r0 = *result.TopicData[0].PartitionData[0].Records;
+ UNIT_ASSERT_EQUAL(r0.BaseOffset, 0);
+ UNIT_ASSERT_EQUAL(r0.BatchLength, 0);
+ UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, 0);
+ UNIT_ASSERT_EQUAL(r0.Magic, 0);
+ UNIT_ASSERT_EQUAL(r0.Crc, 544167206);
+ UNIT_ASSERT_EQUAL(r0.Attributes, 0);
+ UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0);
+ UNIT_ASSERT_EQUAL(r0.BaseTimestamp, 0);
+ UNIT_ASSERT_EQUAL(r0.MaxTimestamp, 0);
+ UNIT_ASSERT_EQUAL(r0.ProducerId, 0);
+ UNIT_ASSERT_EQUAL(r0.ProducerEpoch, 0);
+ UNIT_ASSERT_EQUAL(r0.BaseSequence, 0);
+
+ UNIT_ASSERT_EQUAL(r0.Records.size(), (size_t)1);
+
+ UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("key-1", 5));
+ UNIT_ASSERT_EQUAL(r0.Records[0].Value, TKafkaRawBytes("test message", 12));
+ UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0);
+}
+
char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
}