aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <komels@ydb.tech>2025-04-23 17:23:54 +0300
committerGitHub <noreply@github.com>2025-04-23 17:23:54 +0300
commit5b143711f0ed0d12b1bbb59a0c8f33aa543da6f2 (patch)
tree481d0cf37f3d4e12963e21f4efb0a65fa304a4df
parentb38bd0dd54eff96d609088e35af32106df3e5807 (diff)
downloadydb-5b143711f0ed0d12b1bbb59a0c8f33aa543da6f2.tar.gz
Fix nullable value in TKafkaRecord (#17609)
-rw-r--r--ydb/core/kafka_proxy/kafka_records.h2
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp18
2 files changed, 19 insertions, 1 deletions
diff --git a/ydb/core/kafka_proxy/kafka_records.h b/ydb/core/kafka_proxy/kafka_records.h
index 9afa9c07124..e94c6888aff 100644
--- a/ydb/core/kafka_proxy/kafka_records.h
+++ b/ydb/core/kafka_proxy/kafka_records.h
@@ -165,7 +165,7 @@ public:
static constexpr TKafkaVersions PresentVersions = VersionsAlways;
static constexpr TKafkaVersions TaggedVersions = VersionsNever;
- static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
};
ValueMeta::Type Value;
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 64d1b7ab3c7..23d14409056 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -393,6 +393,24 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
AssertMessageMeta(readMessage, headerKey, headerValue);
}
+ // send empty produce message
+ {
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 3;
+ batch.BaseSequence = 5;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = TKafkaBytes{};
+ batch.Records[0].Value = TKafkaBytes{};
+
+ auto msg = client.Produce(topicName, 0, batch);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
{
// Check short topic name