diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-15 17:37:09 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-15 18:37:39 +0300 |
commit | 364dc0cbc026def771916f339d77b5582a01de6f (patch) | |
tree | 7540da78c1fdff3e771a4fd3de07eec8958d0bc9 | |
parent | b011a40f88775f78a43c514953261f96b0d7be43 (diff) | |
download | ydb-364dc0cbc026def771916f339d77b5582a01de6f.tar.gz |
Support Kafka Record format V0 and V1 (produce only)
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 3 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.h | 31 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_records.cpp | 90 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_records.h | 172 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_serialization.cpp | 44 |
7 files changed, 338 insertions, 9 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index df14f752d8..d085bb2d25 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -355,6 +355,9 @@ public: i32 readVarint(); TArrayRef<const char> Bytes(size_t length); + // returns a character from the specified position. The current position does not change. + char take(size_t shift); + void skip(size_t length); private: diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 2ff392fa41..92ddbb5e20 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -392,7 +392,7 @@ protected: case SIZE_PREPARE: NormalizeNumber(Request->ExpectedSize); if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) { - KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize); + KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: " << Context->Config.GetMaxMessageSize()); return PassAway(); } Step = INFLIGTH_CHECK; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp index 250924815b..f592c8e4e4 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.cpp +++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp @@ -94,6 +94,11 @@ void TKafkaReadable::skip(size_t length) { Position += length; } +char TKafkaReadable::take(size_t shift) { + checkEof(shift + sizeof(char)); + return *(Is.Data() + Position + shift); +} + void TKafkaReadable::checkEof(size_t length) { if (Position + length > Is.Size()) { ythrow yexception() << "unexpected end of stream"; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 95172b8228..890b19ede5 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -17,7 +17,6 @@ namespace NKafka { namespace NPrivate { static constexpr bool DEBUG_ENABLED = false; -static constexpr TKafkaInt32 MAX_RECORDS_SIZE = 1 << 28; // 256Mb struct TWriteCollector { ui32 NumTaggedFields = 0; @@ -447,11 +446,12 @@ public: // template<typename Meta> class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> { + static constexpr TKafkaVersion CURRENT_RECORD_VERSION = 2; public: inline static void DoWrite(TKafkaWritable& writable, TKafkaVersion version, const TKafkaRecords& value) { if (value) { - WriteArraySize<Meta>(writable, version, DoSize(version, value)); - (*value).Write(writable, version); + WriteArraySize<Meta>(writable, version, DoSize(CURRENT_RECORD_VERSION, value)); + (*value).Write(writable, CURRENT_RECORD_VERSION); } else { WriteArraySize<Meta>(writable, version, 0); } @@ -464,16 +464,35 @@ public: inline static void DoRead(TKafkaReadable& readable, TKafkaVersion version, TKafkaRecords& value) { int length = ReadArraySize<Meta>(readable, version); if (length > 0) { + char magic = readable.take(16); value.emplace(); - (*value).Read(readable, version); + + if (magic < CURRENT_RECORD_VERSION) { + TKafkaRecordBatchV0 v0; + v0.Read(readable, magic); + + value->Magic = v0.Record.Magic; + value->Crc = v0.Record.Crc; + value->Attributes = v0.Record.Attributes & 0x07; + + value->Records.resize(1); + auto& record = value->Records.front(); + record.Length = v0.Record.MessageSize; + record.OffsetDelta = v0.Offset; + record.TimestampDelta = v0.Record.Timestamp; + record.Key = v0.Record.Key; + record.Value = v0.Record.Value; + } else { + (*value).Read(readable, magic); + } } else { value = std::nullopt; } } - inline static i64 DoSize(TKafkaVersion version, const TKafkaRecords& value) { + inline static i64 DoSize(TKafkaVersion /*version*/, const TKafkaRecords& value) { if (value) { - return (*value).Size(version); + return (*value).Size(CURRENT_RECORD_VERSION); } else { return 0; } diff --git a/ydb/core/kafka_proxy/kafka_records.cpp b/ydb/core/kafka_proxy/kafka_records.cpp index 5bb6a7fc30..13b4efdea9 100644 --- a/ydb/core/kafka_proxy/kafka_records.cpp +++ b/ydb/core/kafka_proxy/kafka_records.cpp @@ -196,4 +196,94 @@ i32 TKafkaRecordBatch::Size(TKafkaVersion _version) const { return _collector.Size; } + + +// +// TKafkaRecordV0 +// +const TKafkaRecordV0::KeyMeta::Type TKafkaRecordV0::KeyMeta::Default = std::nullopt; + +TKafkaRecordV0::TKafkaRecordV0() + : MessageSize(MessageSizeMeta::Default) + , Crc(CrcMeta::Default) + , Magic(MagicMeta::Default) + , Attributes(AttributesMeta::Default) + , Timestamp(TimestampMeta::Default) + , Key(KeyMeta::Default) { +} + +void TKafkaRecordV0::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TKafkaRecordV0"; + } + NPrivate::Read<MessageSizeMeta>(_readable, _version, MessageSize); + NPrivate::Read<CrcMeta>(_readable, _version, Crc); + NPrivate::Read<MagicMeta>(_readable, _version, Magic); + NPrivate::Read<AttributesMeta>(_readable, _version, Attributes); + NPrivate::Read<TimestampMeta>(_readable, _version, Timestamp); + NPrivate::Read<KeyMeta>(_readable, _version, Key); + NPrivate::Read<ValueMeta>(_readable, _version, Value); +} + +void TKafkaRecordV0::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TKafkaRecordV0"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MessageSizeMeta>(_collector, _writable, _version, MessageSize); + NPrivate::Write<CrcMeta>(_collector, _writable, _version, Crc); + NPrivate::Write<MagicMeta>(_collector, _writable, _version, Magic); + NPrivate::Write<AttributesMeta>(_collector, _writable, _version, Attributes); + NPrivate::Write<TimestampMeta>(_collector, _writable, _version, Timestamp); + NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key); + NPrivate::Write<ValueMeta>(_collector, _writable, _version, Value); +} + +i32 TKafkaRecordV0::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MessageSizeMeta>(_collector, _version, MessageSize); + NPrivate::Size<CrcMeta>(_collector, _version, Crc); + NPrivate::Size<MagicMeta>(_collector, _version, Magic); + NPrivate::Size<AttributesMeta>(_collector, _version, Attributes); + NPrivate::Size<TimestampMeta>(_collector, _version, Timestamp); + NPrivate::Size<KeyMeta>(_collector, _version, Key); + NPrivate::Size<ValueMeta>(_collector, _version, Value); + + return _collector.Size; +} + + + +// +// TKafkaRecordV0 +// +TKafkaRecordBatchV0::TKafkaRecordBatchV0() + : Offset(OffsetMeta::Default) { +} + +void TKafkaRecordBatchV0::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TKafkaRecordBatchV0"; + } + NPrivate::Read<OffsetMeta>(_readable, _version, Offset); + NPrivate::Read<RecordMeta>(_readable, _version, Record); +} + +void TKafkaRecordBatchV0::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TKafkaRecordBatchV0"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<OffsetMeta>(_collector, _writable, _version, Offset); + NPrivate::Write<RecordMeta>(_collector, _writable, _version, Record); +} + +i32 TKafkaRecordBatchV0::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<OffsetMeta>(_collector, _version, Offset); + NPrivate::Size<RecordMeta>(_collector, _version, Record); + + return _collector.Size; +} + } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_records.h b/ydb/core/kafka_proxy/kafka_records.h index fe8540f7ae..2a8e5b3aa5 100644 --- a/ydb/core/kafka_proxy/kafka_records.h +++ b/ydb/core/kafka_proxy/kafka_records.h @@ -70,7 +70,7 @@ public: class TKafkaRecord: public TMessage { public: struct MessageMeta { - static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions PresentVersions = TKafkaVersions(2, 2); static constexpr TKafkaVersions FlexibleVersions = VersionsNever; }; @@ -197,7 +197,7 @@ public: class TKafkaRecordBatch: public TMessage { public: struct MessageMeta { - static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions PresentVersions = TKafkaVersions(2, 2); static constexpr TKafkaVersions FlexibleVersions = VersionsNever; }; @@ -413,4 +413,172 @@ public: bool HasDeleteHorizonMs(); }; + +class TKafkaRecordV0: public TMessage { +public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = TKafkaVersions(0, 1); + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TKafkaRecordV0(); + ~TKafkaRecordV0() = default; + + struct MessageSizeMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "messageSize"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + MessageSizeMeta::Type MessageSize; + + struct CrcMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "CRC"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + CrcMeta::Type Crc; + + struct MagicMeta { + using Type = TKafkaInt8; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "magic"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + MagicMeta::Type Magic; + + struct AttributesMeta { + using Type = TKafkaInt8; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "attributes"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + AttributesMeta::Type Attributes; + + struct TimestampMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "timestamp"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = TKafkaVersions(1, Max<TKafkaVersion>()); + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + TimestampMeta::Type Timestamp; + + struct KeyMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "key"; + static constexpr const char* About = ""; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + KeyMeta::Type Key; + + struct ValueMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "value"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + ValueMeta::Type Value; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TKafkaRecordV0& other) const = default; +}; + +class TKafkaRecordBatchV0: public TMessage { +public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = TKafkaVersions(0, 1); + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TKafkaRecordBatchV0(); + ~TKafkaRecordBatchV0() = default; + + struct OffsetMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "offset"; + static constexpr const char* About = ""; + static constexpr Type Default = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + OffsetMeta::Type Offset; + + struct RecordMeta { + using Type = TKafkaRecordV0; + using TypeDesc = NPrivate::TKafkaStructDesc; + + static constexpr const char* Name = "records"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + RecordMeta::Type Record; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TKafkaRecordBatchV0& other) const = default; +}; + } // namespace NKafka diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp index 5d492d5740..b58b92995c 100644 --- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp +++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp @@ -658,6 +658,50 @@ Y_UNIT_TEST(ProduceRequestData) { } } +Y_UNIT_TEST(ProduceRequestData_Record_v0) { + ui8 reference[] = {0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x05, 0x00, 0x07, 0x72, 0x64, 0x6B, 0x61, 0x66, 0x6B, + 0x61, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x75, 0x30, 0x00, 0x00, 0x00, 0x01, 0x00, 0x12, 0x2F, + 0x52, 0x6F, 0x6F, 0x74, 0x2F, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74, 0x6F, 0x70, 0x69, 0x63, 0x2D, + 0x31, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x1A, 0x00, 0x00, 0x00, 0x2B, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1F, 0x20, 0x6F, 0x55, 0x26, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x05, 0x6B, 0x65, 0x79, 0x2D, 0x31, 0x00, 0x00, 0x00, 0x0C, 0x74, 0x65, 0x73, 0x74, + 0x20, 0x6D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65}; + + TBuffer buffer((char*)reference, sizeof(reference)); + TKafkaReadable readable(buffer); + + Cerr << ">>>>> Buffer size: " << buffer.Size() << Endl; + + TRequestHeaderData header; + header.Read(readable, 1); + + TProduceRequestData result; + result.Read(readable, header.RequestApiVersion); + + UNIT_ASSERT_EQUAL(result.Acks, -1); + UNIT_ASSERT_EQUAL(result.TimeoutMs, 30000); + + auto& r0 = *result.TopicData[0].PartitionData[0].Records; + UNIT_ASSERT_EQUAL(r0.BaseOffset, 0); + UNIT_ASSERT_EQUAL(r0.BatchLength, 0); + UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, 0); + UNIT_ASSERT_EQUAL(r0.Magic, 0); + UNIT_ASSERT_EQUAL(r0.Crc, 544167206); + UNIT_ASSERT_EQUAL(r0.Attributes, 0); + UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0); + UNIT_ASSERT_EQUAL(r0.BaseTimestamp, 0); + UNIT_ASSERT_EQUAL(r0.MaxTimestamp, 0); + UNIT_ASSERT_EQUAL(r0.ProducerId, 0); + UNIT_ASSERT_EQUAL(r0.ProducerEpoch, 0); + UNIT_ASSERT_EQUAL(r0.BaseSequence, 0); + + UNIT_ASSERT_EQUAL(r0.Records.size(), (size_t)1); + + UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("key-1", 5)); + UNIT_ASSERT_EQUAL(r0.Records[0].Value, TKafkaRawBytes("test message", 12)); + UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0); +} + char Hex(const unsigned char c) { return c < 10 ? '0' + c : 'A' + c - 10; } |