diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-15 14:19:33 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-15 14:30:02 +0300 |
commit | cbd5243d6cb5a29e001b1d67ff44e182d2ed561e (patch) | |
tree | 34217d07ec5af89d14a5777ee7918b41748bd1cd | |
parent | b52a8ab5cd66952839ada0843539b5564108a052 (diff) | |
download | ydb-cbd5243d6cb5a29e001b1d67ff44e182d2ed561e.tar.gz |
[kafka] YT-21744: Support dynamic config and cosmetics
wip
e55ce2fefaf54ab6230b1ac5207e0dcda87645d5
-rw-r--r-- | yt/yt/client/kafka/requests-inl.h | 51 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 43 |
2 files changed, 55 insertions, 39 deletions
diff --git a/yt/yt/client/kafka/requests-inl.h b/yt/yt/client/kafka/requests-inl.h new file mode 100644 index 0000000000..ddb178e78b --- /dev/null +++ b/yt/yt/client/kafka/requests-inl.h @@ -0,0 +1,51 @@ +#ifndef REQUESTS_INL_H_ +#error "Direct inclusion of this file is not allowed, include requests.h" +// For the sake of sane code completion. +#include "requests.h" +#endif +#undef REQUESTS_INL_H_ + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +template <typename T, typename ...Args> +void Serialize(const std::vector<T>& data, IKafkaProtocolWriter* writer, bool isCompact, Args&&... args) +{ + if (isCompact) { + auto size = data.size(); + if constexpr (!std::is_same_v<T, TTaggedField>) { + ++size; + } + writer->WriteUnsignedVarInt(size); + } else { + writer->WriteInt32(data.size()); + } + for (const auto& item : data) { + item.Serialize(writer, args...); + } +} + +template <typename T, typename ...Args> +void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isCompact, Args&&...args) +{ + if (isCompact) { + auto size = reader->ReadUnsignedVarInt(); + if (size == 0) { + return; + } + if constexpr (!std::is_same_v<T, TTaggedField>) { + --size; + } + data.resize(size); + } else { + data.resize(reader->ReadInt32()); + } + for (auto& item : data) { + item.Deserialize(reader, args...); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index d3f44673e3..2399665d69 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -120,45 +120,6 @@ struct TRecord //////////////////////////////////////////////////////////////////////////////// -template <typename T, typename ...Args> -void Serialize(const std::vector<T>& data, IKafkaProtocolWriter* writer, bool isCompact, Args&&... args) -{ - if (isCompact) { - auto size = data.size(); - if constexpr (!std::is_same_v<T, TTaggedField>) { - ++size; - } - writer->WriteUnsignedVarInt(size); - } else { - writer->WriteInt32(data.size()); - } - for (const auto& item : data) { - item.Serialize(writer, args...); - } -} - -template <typename T, typename ...Args> -void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isCompact, Args&&...args) -{ - if (isCompact) { - auto size = reader->ReadUnsignedVarInt(); - if (size == 0) { - return; - } - if constexpr (!std::is_same_v<T, TTaggedField>) { - --size; - } - data.resize(size); - } else { - data.resize(reader->ReadInt32()); - } - for (auto& item : data) { - item.Deserialize(reader, args...); - } -} - -//////////////////////////////////////////////////////////////////////////////// - struct TReqApiVersions { static constexpr ERequestType RequestType = ERequestType::ApiVersions; @@ -655,3 +616,7 @@ struct TRspProduce //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NKafka + +#define REQUESTS_INL_H_ +#include "requests-inl.h" +#undef REQUESTS_INL_H_ |