aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-15 14:19:33 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-15 14:30:02 +0300
commitcbd5243d6cb5a29e001b1d67ff44e182d2ed561e (patch)
tree34217d07ec5af89d14a5777ee7918b41748bd1cd
parentb52a8ab5cd66952839ada0843539b5564108a052 (diff)
downloadydb-cbd5243d6cb5a29e001b1d67ff44e182d2ed561e.tar.gz
[kafka] YT-21744: Support dynamic config and cosmetics
wip e55ce2fefaf54ab6230b1ac5207e0dcda87645d5
-rw-r--r--yt/yt/client/kafka/requests-inl.h51
-rw-r--r--yt/yt/client/kafka/requests.h43
2 files changed, 55 insertions, 39 deletions
diff --git a/yt/yt/client/kafka/requests-inl.h b/yt/yt/client/kafka/requests-inl.h
new file mode 100644
index 0000000000..ddb178e78b
--- /dev/null
+++ b/yt/yt/client/kafka/requests-inl.h
@@ -0,0 +1,51 @@
+#ifndef REQUESTS_INL_H_
+#error "Direct inclusion of this file is not allowed, include requests.h"
+// For the sake of sane code completion.
+#include "requests.h"
+#endif
+#undef REQUESTS_INL_H_
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <typename T, typename ...Args>
+void Serialize(const std::vector<T>& data, IKafkaProtocolWriter* writer, bool isCompact, Args&&... args)
+{
+ if (isCompact) {
+ auto size = data.size();
+ if constexpr (!std::is_same_v<T, TTaggedField>) {
+ ++size;
+ }
+ writer->WriteUnsignedVarInt(size);
+ } else {
+ writer->WriteInt32(data.size());
+ }
+ for (const auto& item : data) {
+ item.Serialize(writer, args...);
+ }
+}
+
+template <typename T, typename ...Args>
+void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isCompact, Args&&...args)
+{
+ if (isCompact) {
+ auto size = reader->ReadUnsignedVarInt();
+ if (size == 0) {
+ return;
+ }
+ if constexpr (!std::is_same_v<T, TTaggedField>) {
+ --size;
+ }
+ data.resize(size);
+ } else {
+ data.resize(reader->ReadInt32());
+ }
+ for (auto& item : data) {
+ item.Deserialize(reader, args...);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index d3f44673e3..2399665d69 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -120,45 +120,6 @@ struct TRecord
////////////////////////////////////////////////////////////////////////////////
-template <typename T, typename ...Args>
-void Serialize(const std::vector<T>& data, IKafkaProtocolWriter* writer, bool isCompact, Args&&... args)
-{
- if (isCompact) {
- auto size = data.size();
- if constexpr (!std::is_same_v<T, TTaggedField>) {
- ++size;
- }
- writer->WriteUnsignedVarInt(size);
- } else {
- writer->WriteInt32(data.size());
- }
- for (const auto& item : data) {
- item.Serialize(writer, args...);
- }
-}
-
-template <typename T, typename ...Args>
-void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isCompact, Args&&...args)
-{
- if (isCompact) {
- auto size = reader->ReadUnsignedVarInt();
- if (size == 0) {
- return;
- }
- if constexpr (!std::is_same_v<T, TTaggedField>) {
- --size;
- }
- data.resize(size);
- } else {
- data.resize(reader->ReadInt32());
- }
- for (auto& item : data) {
- item.Deserialize(reader, args...);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
struct TReqApiVersions
{
static constexpr ERequestType RequestType = ERequestType::ApiVersions;
@@ -655,3 +616,7 @@ struct TRspProduce
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NKafka
+
+#define REQUESTS_INL_H_
+#include "requests-inl.h"
+#undef REQUESTS_INL_H_