diff options
author | tesseract <tesseract@yandex-team.com> | 2023-06-23 15:38:51 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-06-23 15:38:51 +0300 |
commit | 946e3fceb61ca8aea25cc61c39c3d34a60c508fe (patch) | |
tree | 3ee102f4f801da842b2bf44d5ab7baabf7d8f136 | |
parent | d37449771450cd5e4c8fce14ee366a68da87dcb8 (diff) | |
download | ydb-946e3fceb61ca8aea25cc61c39c3d34a60c508fe.tar.gz |
Kafka protocol - limit message size
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 112 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 234 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.h | 8 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_listener.h | 6 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.cpp | 5255 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.h | 132 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.cpp | 60 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.h | 335 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_serialization.cpp | 296 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 4 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_impl.h | 47 |
12 files changed, 1003 insertions, 5488 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 91b339aa37..24672d25ae 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2871,7 +2871,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu setup->LocalServices.emplace_back( TActorId(), - TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings), + TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()), TMailboxType::HTSwap, appData->UserPoolId) ); } diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index 8e4f5651b2..24429079cc 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -4,13 +4,13 @@ #include <optional> #include <ostream> +#include <ydb/core/raw_socket/sock_impl.h> #include <ydb/library/yql/public/decimal/yql_wide_int.h> #include <util/generic/buffer.h> #include <util/generic/strbuf.h> #include <util/system/types.h> - namespace NKafka { /* @@ -48,30 +48,36 @@ using TKafkaUuid = NYql::TWide<ui64>; using TKafkaFloat64 = double; using TKafkaRawString = TString; using TKafkaString = std::optional<TKafkaRawString>; -using TKafkaRawBytes = TBuffer; +using TKafkaRawBytes = TArrayRef<const char>; using TKafkaBytes = std::optional<TKafkaRawBytes>; using TKafkaRecords = std::optional<TKafkaRawBytes>; - using TKafkaVersion = i16; - -void ErrorOnUnexpectedEnd(std::istream& is); +using TWritableBuf = NKikimr::NRawSocket::TBufferedWriter; + +template <typename T> +void NormalizeNumber(T& value) { +#ifndef WORDS_BIGENDIAN + char* b = (char*)&value; + char* e = b + sizeof(T) - 1; + while (b < e) { + std::swap(*b, *e); + ++b; + --e; + } +#endif +} class TKafkaWritable { public: - TKafkaWritable(std::ostream& os) : Os(os) {}; + TKafkaWritable(TWritableBuf& buffer) + : Buffer(buffer){}; - template<typename T> + template <typename T> TKafkaWritable& operator<<(const T val) { - char* v = (char*)&val; -#ifdef WORDS_BIGENDIAN - Os.write(v, sizeof(T)); -#else - for(i8 i = sizeof(T) - 1; 0 <= i; --i) { - Os.write(v + i, sizeof(char)); - } -#endif + NormalizeNumber(val); + write((const char*)&val, sizeof(T)); return *this; }; @@ -82,62 +88,71 @@ public: void writeUnsignedVarint(TKafkaUint32 val); void writeVarint(TKafkaInt32 val); void writeVarint(TKafkaInt64 val); + void write(const char* val, size_t length); private: - std::ostream& Os; + TWritableBuf& Buffer; }; class TKafkaReadable { public: - TKafkaReadable(std::istream& is): Is(is) {}; + TKafkaReadable(const TBuffer& is) + : Is(is) + , Position(0) { + } - template<typename T> + template <typename T> TKafkaReadable& operator>>(T& val) { char* v = (char*)&val; -#ifdef WORDS_BIGENDIAN - Is.read(v, sizeof(T)); -#else - for(i8 i = sizeof(T) - 1; 0 <= i; --i) { - Is.read(v + i, sizeof(char)); - } -#endif - ErrorOnUnexpectedEnd(Is); + read(v, sizeof(T)); + NormalizeNumber(val); return *this; }; TKafkaReadable& operator>>(TKafkaUuid& val); - void read(char* val, int length); + void read(char* val, size_t length); + char get(); ui32 readUnsignedVarint(); + TArrayRef<const char> Bytes(size_t length); - void skip(int length); + void skip(size_t length); private: - std::istream& Is; -}; + void checkEof(size_t length); + const TBuffer& Is; + size_t Position; +}; struct TReadDemand { constexpr TReadDemand() : Buffer(nullptr) - , Length(0) - {} + , Length(0) { + } constexpr TReadDemand(char* buffer, size_t length) : Buffer(buffer) - , Length(length) - {} + , Length(length) { + } constexpr TReadDemand(size_t length) : Buffer(nullptr) - , Length(length) - {} - - char* GetBuffer() const { return Buffer; } - size_t GetLength() const { return Length; } - explicit operator bool() const { return 0 < Length; } - bool Skip() const { return nullptr == Buffer; } - + , Length(length) { + } + + char* GetBuffer() const { + return Buffer; + } + size_t GetLength() const { + return Length; + } + explicit operator bool() const { + return 0 < Length; + } + bool Skip() const { + return nullptr == Buffer; + } char* Buffer; size_t Length; @@ -145,33 +160,24 @@ struct TReadDemand { static constexpr TReadDemand NoDemand; -class TReadContext { -public: - virtual ~TReadContext() = default; - virtual TReadDemand Next() = 0; -}; - - class TMessage { public: virtual ~TMessage() = default; virtual i32 Size(TKafkaVersion version) const = 0; - //virtual void Read(TKafkaReadable& readable, TKafkaVersion version) = 0; + virtual void Read(TKafkaReadable& readable, TKafkaVersion version) = 0; virtual void Write(TKafkaWritable& writable, TKafkaVersion version) const = 0; - virtual std::unique_ptr<TReadContext> CreateReadContext(TKafkaVersion version) = 0; bool operator==(const TMessage& other) const = default; }; -class TApiMessage : public TMessage { +class TApiMessage: public TMessage { public: ~TApiMessage() = default; virtual i16 ApiKey() const = 0; }; - std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey); std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey); diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index f9934a1a8d..44c507b88f 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -33,14 +33,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet public: using TBase = TActorBootstrapped<TKafkaConnection>; - // TODO check standard ip packet MTU. On my desktop it is 1500 on eth and wlp interfaces. It is 1300 on the tun interface. It - // is 1500 and 8950 on the dev server interfaces. - static constexpr size_t BufferSize = 8950; - static constexpr size_t MinDirectSize = 256; - struct Msg { size_t Size = 0; TKafkaInt32 ExpectedSize = 0; + TBuffer Buffer; TRequestHeaderData Header; std::unique_ptr<TMessage> Message; }; @@ -51,6 +47,8 @@ public: TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; + const NKikimrConfig::TKafkaProxyConfig& Config; + THPTimer InactivityTimer; bool IsAuthRequired = true; @@ -59,32 +57,25 @@ public: bool ConnectionEstablished = false; bool CloseConnection = false; - TBuffer Buffer; - size_t Length; - size_t Position; - Msg Request; - bool HeaderSizeWasRead; - bool HeaderWasRead; - bool MessageSizeWasRead; - bool MessageWasRead; - std::unique_ptr<TReadContext> Ctx; + + enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS }; + EReadSteps Step; TReadDemand Demand; - TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) + size_t InflightSize; + + TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, + const NKikimrConfig::TKafkaProxyConfig& config) : Socket(std::move(socket)) , Address(address) - , Length(0) - , Position(0) - , HeaderSizeWasRead(false) - , HeaderWasRead(false) - , MessageSizeWasRead(false) - , MessageWasRead(false) - , Demand(NoDemand) { + , Config(config) + , Step(SIZE_READ) + , Demand(NoDemand) + , InflightSize(0) { SetNonBlock(); IsSslSupported = IsSslSupported && Socket->IsSslSupported(); - Buffer.Resize(BufferSize); } void Bootstrap() { @@ -155,7 +146,7 @@ protected: } } - void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/) { + void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/, size_t messageSize) { TApiVersionsResponseData response; response.ApiKeys.resize(4); @@ -176,9 +167,11 @@ protected: response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMax; Reply(header, &response); + + InflightSize -= messageSize; } - void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message) { + void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message, size_t messageSize) { TProduceResponseData response; response.Responses.resize(message->TopicData.size()); int i = 0; @@ -196,9 +189,11 @@ protected: } Reply(header, &response); + + InflightSize -= messageSize; } - void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/) { + void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/, size_t messageSize) { TInitProducerIdResponseData response; response.ProducerEpoch = 1; response.ProducerId = 1; @@ -206,9 +201,11 @@ protected: response.ThrottleTimeMs = 0; Reply(header, &response); + + InflightSize -= messageSize; } - void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/) { + void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/, size_t messageSize) { TMetadataResponseData response; response.ThrottleTimeMs = 0; response.ClusterId = "cluster-ahjgk"; @@ -230,6 +227,8 @@ protected: response.Topics[0].Partitions[0].IsrNodes[0] = 1; Reply(header, &response); + + InflightSize -= messageSize; } void ProcessRequest() { @@ -237,19 +236,19 @@ protected: << ", Size=" << Request.Size); switch (Request.Header.RequestApiKey) { case PRODUCE: - HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get())); + HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()), Request.ExpectedSize); return; case API_VERSIONS: - HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get())); + HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()), Request.ExpectedSize); return; case INIT_PRODUCER_ID: - HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get())); + HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()), Request.ExpectedSize); return; case METADATA: - HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get())); + HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()), Request.ExpectedSize); return; default: @@ -267,115 +266,101 @@ protected: TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version); - std::stringstream sb; - TKafkaWritable writable(sb); + TBufferedWriter buffer(Socket.Get(), Config.GetPacketSize()); + TKafkaWritable writable(buffer); + writable << size; responseHeader.Write(writable, headerVersion); reply->Write(writable, version); - TBuffer b; - b.Reserve(size + sizeof(size)); - sb.read(b.data(), size + sizeof(size)); - - Print("sent", b, size + sizeof(size)); - - SocketSend(b.Data(), size + sizeof(size)); + buffer.flush(); } void DoRead() { for (;;) { while (Demand) { ssize_t received = 0; - if (Position < Length) { - KAFKA_LOG_T("Read from buffer: Position=" << Position << ", Length=" << Length - << ", Demand=" << Demand.GetLength()); - received = std::min(Demand.Length, Length - Position); - if (!Demand.Skip()) { - memcpy(Demand.Buffer, Buffer.Data() + Position, received); - } - Position += received; - } else if (!Demand.Skip() && Demand.Length >= MinDirectSize) { - ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength()); - if (-res == EAGAIN || -res == EWOULDBLOCK) { - return; - } else if (-res == EINTR) { - continue; - } else if (!res) { - KAFKA_LOG_ERROR("connection closed"); - return PassAway(); - } else if (res < 0) { - KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); - return PassAway(); - } - received = res; - if (!received) { - return; - } - } else { - Position = 0; - Length = 0; - ssize_t res = SocketReceive(Buffer.Data(), BufferSize); - if (-res == EAGAIN || -res == EWOULDBLOCK) { - return; - } else if (-res == EINTR) { - continue; - } else if (!res) { - KAFKA_LOG_ERROR("connection closed"); - return PassAway(); - } else if (res < 0) { - KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); - return PassAway(); - } - Length = res; - Print("received", Buffer, Length); - if (!Length) { - return; - } - + ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength()); + if (-res == EAGAIN || -res == EWOULDBLOCK) { + return; + } else if (-res == EINTR) { continue; + } else if (!res) { + KAFKA_LOG_ERROR("connection closed"); + return PassAway(); + } else if (res < 0) { + KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); + return PassAway(); + } + received = res; + if (!received) { + return; } Request.Size += received; Demand.Buffer += received; Demand.Length -= received; } - if (Ctx) { - Demand = Ctx->Next(); - } if (!Demand) { - if (MessageWasRead) { - HeaderSizeWasRead = false; - MessageWasRead = false; - HeaderWasRead = false; - - ProcessRequest(); - - Request = Msg(); - Ctx = nullptr; - } - if (!HeaderSizeWasRead) { - Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize)); - HeaderSizeWasRead = true; - Ctx = nullptr; - } else if (!HeaderWasRead) { - NPrivate::NormalizeNumber(Request.ExpectedSize); - - KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize); - - Ctx = Request.Header.CreateReadContext(2); - - HeaderWasRead = true; - } else { - KAFKA_LOG_T("received header. ApiKey=" << Request.Header.RequestApiKey - << ", Version=" << Request.Header.RequestApiVersion); - - i16 apiKey = Request.Header.RequestApiKey; - TKafkaVersion version = Request.Header.RequestApiVersion; - - Request.Message = CreateRequest(apiKey); - Ctx = Request.Message->CreateReadContext(version); - - MessageWasRead = true; + switch (Step) { + case SIZE_READ: + Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize)); + Step = SIZE_PREPARE; + break; + + case SIZE_PREPARE: + NormalizeNumber(Request.ExpectedSize); + if ((ui64)Request.ExpectedSize > Config.GetMaxMessageSize()) { + KAFKA_LOG_ERROR("message is big. Size: " << Request.ExpectedSize); + return PassAway(); + } + Step = INFLIGTH_CHECK; + + case INFLIGTH_CHECK: + if (InflightSize + Request.ExpectedSize > Config.GetMaxInflightSize()) { + return; + } + InflightSize += Request.ExpectedSize; + Step = MESSAGE_READ; + + case MESSAGE_READ: + KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize); + + Request.Buffer.Resize(Request.ExpectedSize); + Demand = TReadDemand(Request.Buffer.Data(), Request.ExpectedSize); + + Step = MESSAGE_PROCESS; + break; + + case MESSAGE_PROCESS: + TKafkaInt16 apiKey = *(TKafkaInt16*)Request.Buffer.Data(); + TKafkaVersion apiVersion = *(TKafkaVersion*)(Request.Buffer.Data() + sizeof(TKafkaInt16)); + + NormalizeNumber(apiKey); + NormalizeNumber(apiVersion); + + KAFKA_LOG_D("received message. ApiKey=" << Request.Header.RequestApiKey + << ", Version=" << Request.Header.RequestApiVersion); + + // Print("received", Request.Buffer, Request.ExpectedSize); + + TKafkaReadable readable(Request.Buffer); + + Request.Message = CreateRequest(apiKey); + try { + Request.Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion)); + Request.Message->Read(readable, apiVersion); + } catch(const yexception& e) { + KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request.Header.RequestApiKey + << ", Version=" << Request.Header.RequestApiVersion + << ", Error=" << e.what()); + return PassAway(); + } + + ProcessRequest(); + + Step = SIZE_READ; + break; } } } @@ -420,8 +405,9 @@ protected: } }; -NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { - return new TKafkaConnection(std::move(socket), std::move(address)); +NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, + const NKikimrConfig::TKafkaProxyConfig& config) { + return new TKafkaConnection(std::move(socket), std::move(address), config); } } // namespace NKafka
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h index e23cf9995f..acece5b24e 100644 --- a/ydb/core/kafka_proxy/kafka_connection.h +++ b/ydb/core/kafka_proxy/kafka_connection.h @@ -1,13 +1,15 @@ #pragma once #include <library/cpp/actors/core/actor.h> +#include <ydb/core/protos/config.pb.h> #include <ydb/core/raw_socket/sock_config.h> #include <ydb/core/raw_socket/sock_impl.h> namespace NKafka { -using namespace NKikimr::NRawSocket; +using namespace NKikimr::NRawSocket; -NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address); +NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, + const NKikimrConfig::TKafkaProxyConfig& config); -} +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h index a5a36cfc3c..d5dde3ad41 100644 --- a/ydb/core/kafka_proxy/kafka_listener.h +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -7,11 +7,11 @@ namespace NKafka { using namespace NKikimr::NRawSocket; -inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings = {.Port = 9092}) { +inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) { return CreateSocketListener( poller, settings, - [](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { - return CreateKafkaConnection(socket, address); + [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return CreateKafkaConnection(socket, address, config); }, NKikimrServices::EServiceKikimr::KAFKA_PROXY); } diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 7c45f6719c..f4df5cef28 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -133,147 +133,25 @@ TRequestHeaderData::TRequestHeaderData() , ClientId(ClientIdMeta::Default) {} - -class TRequestHeaderData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TRequestHeaderData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TRequestHeaderData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TRequestHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TRequestHeaderData"; + } + NPrivate::Read<RequestApiKeyMeta>(_readable, _version, RequestApiKey); + NPrivate::Read<RequestApiVersionMeta>(_readable, _version, RequestApiVersion); + NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId); + NPrivate::Read<ClientIdMeta>(_readable, _version, ClientId); - NPrivate::TReadStrategy<RequestApiKeyMeta> RequestApiKey; - NPrivate::TReadStrategy<RequestApiVersionMeta> RequestApiVersion; - NPrivate::TReadStrategy<CorrelationIdMeta> CorrelationId; - NPrivate::TReadStrategy<ClientIdMeta> ClientId; -}; - -TRequestHeaderData::TReadContext::TReadContext(TRequestHeaderData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , RequestApiKey() - , RequestApiVersion() - , CorrelationId() - , ClientId() -{} - - -TReadDemand TRequestHeaderData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - RequestApiKey.Init<NPrivate::ReadFieldRule<RequestApiKeyMeta>>(Value.RequestApiKey, Version); - } - case 1: { - auto demand = RequestApiKey.Next<NPrivate::ReadFieldRule<RequestApiKeyMeta>>(Value.RequestApiKey, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - RequestApiVersion.Init<NPrivate::ReadFieldRule<RequestApiVersionMeta>>(Value.RequestApiVersion, Version); - } - case 3: { - auto demand = RequestApiVersion.Next<NPrivate::ReadFieldRule<RequestApiVersionMeta>>(Value.RequestApiVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - CorrelationId.Init<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version); - } - case 5: { - auto demand = CorrelationId.Next<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - ClientId.Init<NPrivate::ReadFieldRule<ClientIdMeta>>(Value.ClientId, Version); - } - case 7: { - auto demand = ClientId.Next<NPrivate::ReadFieldRule<ClientIdMeta>>(Value.ClientId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TRequestHeaderData::MessageMeta::FlexibleVersionMin, TRequestHeaderData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -306,9 +184,6 @@ i32 TRequestHeaderData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TRequestHeaderData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -320,105 +195,22 @@ TResponseHeaderData::TResponseHeaderData() : CorrelationId(CorrelationIdMeta::Default) {} - -class TResponseHeaderData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TResponseHeaderData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TResponseHeaderData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TResponseHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TResponseHeaderData"; + } + NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId); - NPrivate::TReadStrategy<CorrelationIdMeta> CorrelationId; -}; - -TResponseHeaderData::TReadContext::TReadContext(TResponseHeaderData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , CorrelationId() -{} - - -TReadDemand TResponseHeaderData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - CorrelationId.Init<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version); - } - case 1: { - auto demand = CorrelationId.Next<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - if (!NPrivate::VersionCheck<TResponseHeaderData::MessageMeta::FlexibleVersionMin, TResponseHeaderData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 3: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 5: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 7: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 4; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -445,9 +237,6 @@ i32 TResponseHeaderData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TResponseHeaderData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -463,147 +252,25 @@ TProduceRequestData::TProduceRequestData() , TimeoutMs(TimeoutMsMeta::Default) {} - -class TProduceRequestData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceRequestData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceRequestData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData"; + } + NPrivate::Read<TransactionalIdMeta>(_readable, _version, TransactionalId); + NPrivate::Read<AcksMeta>(_readable, _version, Acks); + NPrivate::Read<TimeoutMsMeta>(_readable, _version, TimeoutMs); + NPrivate::Read<TopicDataMeta>(_readable, _version, TopicData); - NPrivate::TReadStrategy<TransactionalIdMeta> TransactionalId; - NPrivate::TReadStrategy<AcksMeta> Acks; - NPrivate::TReadStrategy<TimeoutMsMeta> TimeoutMs; - NPrivate::TReadStrategy<TopicDataMeta> TopicData; -}; - -TProduceRequestData::TReadContext::TReadContext(TProduceRequestData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , TransactionalId() - , Acks() - , TimeoutMs() - , TopicData() -{} - - -TReadDemand TProduceRequestData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - TransactionalId.Init<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version); - } - case 1: { - auto demand = TransactionalId.Next<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Acks.Init<NPrivate::ReadFieldRule<AcksMeta>>(Value.Acks, Version); - } - case 3: { - auto demand = Acks.Next<NPrivate::ReadFieldRule<AcksMeta>>(Value.Acks, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - TimeoutMs.Init<NPrivate::ReadFieldRule<TimeoutMsMeta>>(Value.TimeoutMs, Version); - } - case 5: { - auto demand = TimeoutMs.Next<NPrivate::ReadFieldRule<TimeoutMsMeta>>(Value.TimeoutMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - TopicData.Init<NPrivate::ReadFieldRule<TopicDataMeta>>(Value.TopicData, Version); - } - case 7: { - auto demand = TopicData.Next<NPrivate::ReadFieldRule<TopicDataMeta>>(Value.TopicData, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TProduceRequestData::MessageMeta::FlexibleVersionMin, TProduceRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -636,9 +303,6 @@ i32 TProduceRequestData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceRequestData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -650,119 +314,23 @@ TProduceRequestData::TTopicProduceData::TTopicProduceData() : Name(NameMeta::Default) {} - -class TProduceRequestData::TTopicProduceData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceRequestData::TTopicProduceData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceRequestData::TTopicProduceData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceRequestData::TTopicProduceData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData::TTopicProduceData"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionDataMeta>(_readable, _version, PartitionData); - NPrivate::TReadStrategy<NameMeta> Name; - NPrivate::TReadStrategy<PartitionDataMeta> PartitionData; -}; - -TProduceRequestData::TTopicProduceData::TReadContext::TReadContext(TProduceRequestData::TTopicProduceData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Name() - , PartitionData() -{} - - -TReadDemand TProduceRequestData::TTopicProduceData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 1: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - PartitionData.Init<NPrivate::ReadFieldRule<PartitionDataMeta>>(Value.PartitionData, Version); - } - case 3: { - auto demand = PartitionData.Next<NPrivate::ReadFieldRule<PartitionDataMeta>>(Value.PartitionData, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TProduceRequestData::TTopicProduceData::MessageMeta::FlexibleVersionMin, TProduceRequestData::TTopicProduceData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -791,9 +359,6 @@ i32 TProduceRequestData::TTopicProduceData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceRequestData::TTopicProduceData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -805,119 +370,23 @@ TProduceRequestData::TTopicProduceData::TPartitionProduceData::TPartitionProduce : Index(IndexMeta::Default) {} - -class TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceRequestData::TTopicProduceData::TPartitionProduceData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceRequestData::TTopicProduceData::TPartitionProduceData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceRequestData::TTopicProduceData::TPartitionProduceData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData::TTopicProduceData::TPartitionProduceData"; + } + NPrivate::Read<IndexMeta>(_readable, _version, Index); + NPrivate::Read<RecordsMeta>(_readable, _version, Records); - NPrivate::TReadStrategy<IndexMeta> Index; - NPrivate::TReadStrategy<RecordsMeta> Records; -}; - -TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext::TReadContext(TProduceRequestData::TTopicProduceData::TPartitionProduceData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Index() - , Records() -{} - - -TReadDemand TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Index.Init<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version); - } - case 1: { - auto demand = Index.Next<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Records.Init<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version); - } - case 3: { - auto demand = Records.Next<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TProduceRequestData::TTopicProduceData::TPartitionProduceData::MessageMeta::FlexibleVersionMin, TProduceRequestData::TTopicProduceData::TPartitionProduceData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -946,9 +415,6 @@ i32 TProduceRequestData::TTopicProduceData::TPartitionProduceData::Size(TKafkaVe } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceRequestData::TTopicProduceData::TPartitionProduceData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -960,119 +426,23 @@ TProduceResponseData::TProduceResponseData() : ThrottleTimeMs(ThrottleTimeMsMeta::Default) {} - -class TProduceResponseData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceResponseData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceResponseData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData"; + } + NPrivate::Read<ResponsesMeta>(_readable, _version, Responses); + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); - NPrivate::TReadStrategy<ResponsesMeta> Responses; - NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs; -}; - -TProduceResponseData::TReadContext::TReadContext(TProduceResponseData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Responses() - , ThrottleTimeMs() -{} - - -TReadDemand TProduceResponseData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Responses.Init<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version); - } - case 1: { - auto demand = Responses.Next<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - } - case 3: { - auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TProduceResponseData::MessageMeta::FlexibleVersionMin, TProduceResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -1101,9 +471,6 @@ i32 TProduceResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceResponseData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -1115,119 +482,23 @@ TProduceResponseData::TTopicProduceResponse::TTopicProduceResponse() : Name(NameMeta::Default) {} - -class TProduceResponseData::TTopicProduceResponse::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceResponseData::TTopicProduceResponse& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceResponseData::TTopicProduceResponse& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceResponseData::TTopicProduceResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionResponsesMeta>(_readable, _version, PartitionResponses); - NPrivate::TReadStrategy<NameMeta> Name; - NPrivate::TReadStrategy<PartitionResponsesMeta> PartitionResponses; -}; - -TProduceResponseData::TTopicProduceResponse::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Name() - , PartitionResponses() -{} - - -TReadDemand TProduceResponseData::TTopicProduceResponse::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 1: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - PartitionResponses.Init<NPrivate::ReadFieldRule<PartitionResponsesMeta>>(Value.PartitionResponses, Version); - } - case 3: { - auto demand = PartitionResponses.Next<NPrivate::ReadFieldRule<PartitionResponsesMeta>>(Value.PartitionResponses, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -1256,9 +527,6 @@ i32 TProduceResponseData::TTopicProduceResponse::Size(TKafkaVersion _version) co } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -1280,189 +548,28 @@ TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TPartiti , ErrorMessage(ErrorMessageMeta::Default) {} - -class TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<IndexMeta> Index; - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<BaseOffsetMeta> BaseOffset; - NPrivate::TReadStrategy<LogAppendTimeMsMeta> LogAppendTimeMs; - NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset; - NPrivate::TReadStrategy<RecordErrorsMeta> RecordErrors; - NPrivate::TReadStrategy<ErrorMessageMeta> ErrorMessage; -}; - -TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Index() - , ErrorCode() - , BaseOffset() - , LogAppendTimeMs() - , LogStartOffset() - , RecordErrors() - , ErrorMessage() -{} - - -TReadDemand TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Index.Init<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version); - } - case 1: { - auto demand = Index.Next<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 3: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - BaseOffset.Init<NPrivate::ReadFieldRule<BaseOffsetMeta>>(Value.BaseOffset, Version); - } - case 5: { - auto demand = BaseOffset.Next<NPrivate::ReadFieldRule<BaseOffsetMeta>>(Value.BaseOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - LogAppendTimeMs.Init<NPrivate::ReadFieldRule<LogAppendTimeMsMeta>>(Value.LogAppendTimeMs, Version); - } - case 7: { - auto demand = LogAppendTimeMs.Next<NPrivate::ReadFieldRule<LogAppendTimeMsMeta>>(Value.LogAppendTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - } - case 9: { - auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - RecordErrors.Init<NPrivate::ReadFieldRule<RecordErrorsMeta>>(Value.RecordErrors, Version); - } - case 11: { - auto demand = RecordErrors.Next<NPrivate::ReadFieldRule<RecordErrorsMeta>>(Value.RecordErrors, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - ++Step; - ErrorMessage.Init<NPrivate::ReadFieldRule<ErrorMessageMeta>>(Value.ErrorMessage, Version); - } - case 13: { - auto demand = ErrorMessage.Next<NPrivate::ReadFieldRule<ErrorMessageMeta>>(Value.ErrorMessage, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 15: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 17: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 18: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 19: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 16; +void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse"; + } + NPrivate::Read<IndexMeta>(_readable, _version, Index); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<BaseOffsetMeta>(_readable, _version, BaseOffset); + NPrivate::Read<LogAppendTimeMsMeta>(_readable, _version, LogAppendTimeMs); + NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset); + NPrivate::Read<RecordErrorsMeta>(_readable, _version, RecordErrors); + NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -1501,9 +608,6 @@ i32 TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Size } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -1517,119 +621,23 @@ TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIn , BatchIndexErrorMessage(BatchIndexErrorMessageMeta::Default) {} - -class TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage"; + } + NPrivate::Read<BatchIndexMeta>(_readable, _version, BatchIndex); + NPrivate::Read<BatchIndexErrorMessageMeta>(_readable, _version, BatchIndexErrorMessage); - NPrivate::TReadStrategy<BatchIndexMeta> BatchIndex; - NPrivate::TReadStrategy<BatchIndexErrorMessageMeta> BatchIndexErrorMessage; -}; - -TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , BatchIndex() - , BatchIndexErrorMessage() -{} - - -TReadDemand TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - BatchIndex.Init<NPrivate::ReadFieldRule<BatchIndexMeta>>(Value.BatchIndex, Version); - } - case 1: { - auto demand = BatchIndex.Next<NPrivate::ReadFieldRule<BatchIndexMeta>>(Value.BatchIndex, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - BatchIndexErrorMessage.Init<NPrivate::ReadFieldRule<BatchIndexErrorMessageMeta>>(Value.BatchIndexErrorMessage, Version); - } - case 3: { - auto demand = BatchIndexErrorMessage.Next<NPrivate::ReadFieldRule<BatchIndexErrorMessageMeta>>(Value.BatchIndexErrorMessage, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -1658,9 +666,6 @@ i32 TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBat } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -1688,253 +693,35 @@ TFetchRequestData::TFetchRequestData() , RackId(RackIdMeta::Default) {} - -class TFetchRequestData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchRequestData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchRequestData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<ClusterIdMeta> ClusterId; - NPrivate::TReadStrategy<ReplicaIdMeta> ReplicaId; - NPrivate::TReadStrategy<MaxWaitMsMeta> MaxWaitMs; - NPrivate::TReadStrategy<MinBytesMeta> MinBytes; - NPrivate::TReadStrategy<MaxBytesMeta> MaxBytes; - NPrivate::TReadStrategy<IsolationLevelMeta> IsolationLevel; - NPrivate::TReadStrategy<SessionIdMeta> SessionId; - NPrivate::TReadStrategy<SessionEpochMeta> SessionEpoch; - NPrivate::TReadStrategy<TopicsMeta> Topics; - NPrivate::TReadStrategy<ForgottenTopicsDataMeta> ForgottenTopicsData; - NPrivate::TReadStrategy<RackIdMeta> RackId; -}; - -TFetchRequestData::TReadContext::TReadContext(TFetchRequestData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ClusterId() - , ReplicaId() - , MaxWaitMs() - , MinBytes() - , MaxBytes() - , IsolationLevel() - , SessionId() - , SessionEpoch() - , Topics() - , ForgottenTopicsData() - , RackId() -{} - - -TReadDemand TFetchRequestData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ClusterId.Init<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - } - case 1: { - auto demand = ClusterId.Next<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ReplicaId.Init<NPrivate::ReadFieldRule<ReplicaIdMeta>>(Value.ReplicaId, Version); - } - case 3: { - auto demand = ReplicaId.Next<NPrivate::ReadFieldRule<ReplicaIdMeta>>(Value.ReplicaId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - MaxWaitMs.Init<NPrivate::ReadFieldRule<MaxWaitMsMeta>>(Value.MaxWaitMs, Version); - } - case 5: { - auto demand = MaxWaitMs.Next<NPrivate::ReadFieldRule<MaxWaitMsMeta>>(Value.MaxWaitMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - MinBytes.Init<NPrivate::ReadFieldRule<MinBytesMeta>>(Value.MinBytes, Version); - } - case 7: { - auto demand = MinBytes.Next<NPrivate::ReadFieldRule<MinBytesMeta>>(Value.MinBytes, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - MaxBytes.Init<NPrivate::ReadFieldRule<MaxBytesMeta>>(Value.MaxBytes, Version); - } - case 9: { - auto demand = MaxBytes.Next<NPrivate::ReadFieldRule<MaxBytesMeta>>(Value.MaxBytes, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - IsolationLevel.Init<NPrivate::ReadFieldRule<IsolationLevelMeta>>(Value.IsolationLevel, Version); - } - case 11: { - auto demand = IsolationLevel.Next<NPrivate::ReadFieldRule<IsolationLevelMeta>>(Value.IsolationLevel, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - ++Step; - SessionId.Init<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version); - } - case 13: { - auto demand = SessionId.Next<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - ++Step; - SessionEpoch.Init<NPrivate::ReadFieldRule<SessionEpochMeta>>(Value.SessionEpoch, Version); - } - case 15: { - auto demand = SessionEpoch.Next<NPrivate::ReadFieldRule<SessionEpochMeta>>(Value.SessionEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - ++Step; - Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - } - case 17: { - auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 18: { - ++Step; - ForgottenTopicsData.Init<NPrivate::ReadFieldRule<ForgottenTopicsDataMeta>>(Value.ForgottenTopicsData, Version); - } - case 19: { - auto demand = ForgottenTopicsData.Next<NPrivate::ReadFieldRule<ForgottenTopicsDataMeta>>(Value.ForgottenTopicsData, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 20: { - ++Step; - RackId.Init<NPrivate::ReadFieldRule<RackIdMeta>>(Value.RackId, Version); - } - case 21: { - auto demand = RackId.Next<NPrivate::ReadFieldRule<RackIdMeta>>(Value.RackId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 22: { - if (!NPrivate::VersionCheck<TFetchRequestData::MessageMeta::FlexibleVersionMin, TFetchRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 23: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 24: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 25: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 26: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 27: { - TReadDemand demand; - switch(Tag_.Value) { - case 0: { - if (!TagInitialized_) { - TagInitialized_=true; - ClusterId.Init<NPrivate::ReadTaggedFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - } - demand = ClusterId.Next<NPrivate::ReadTaggedFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - break; - } - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 24; +void TFetchRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData"; + } + NPrivate::Read<ClusterIdMeta>(_readable, _version, ClusterId); + NPrivate::Read<ReplicaIdMeta>(_readable, _version, ReplicaId); + NPrivate::Read<MaxWaitMsMeta>(_readable, _version, MaxWaitMs); + NPrivate::Read<MinBytesMeta>(_readable, _version, MinBytes); + NPrivate::Read<MaxBytesMeta>(_readable, _version, MaxBytes); + NPrivate::Read<IsolationLevelMeta>(_readable, _version, IsolationLevel); + NPrivate::Read<SessionIdMeta>(_readable, _version, SessionId); + NPrivate::Read<SessionEpochMeta>(_readable, _version, SessionEpoch); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + NPrivate::Read<ForgottenTopicsDataMeta>(_readable, _version, ForgottenTopicsData); + NPrivate::Read<RackIdMeta>(_readable, _version, RackId); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + case ClusterIdMeta::Tag: + NPrivate::ReadTag<ClusterIdMeta>(_readable, _version, ClusterId); + break; + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -1982,9 +769,6 @@ i32 TFetchRequestData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchRequestData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -1998,133 +782,24 @@ TFetchRequestData::TFetchTopic::TFetchTopic() , TopicId(TopicIdMeta::Default) {} - -class TFetchRequestData::TFetchTopic::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchRequestData::TFetchTopic& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchRequestData::TFetchTopic& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchRequestData::TFetchTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TFetchTopic"; + } + NPrivate::Read<TopicMeta>(_readable, _version, Topic); + NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); - NPrivate::TReadStrategy<TopicMeta> Topic; - NPrivate::TReadStrategy<TopicIdMeta> TopicId; - NPrivate::TReadStrategy<PartitionsMeta> Partitions; -}; - -TFetchRequestData::TFetchTopic::TReadContext::TReadContext(TFetchRequestData::TFetchTopic& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Topic() - , TopicId() - , Partitions() -{} - - -TReadDemand TFetchRequestData::TFetchTopic::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - } - case 1: { - auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - } - case 3: { - auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - } - case 5: { - auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TFetchRequestData::TFetchTopic::MessageMeta::FlexibleVersionMin, TFetchRequestData::TFetchTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -2155,9 +830,6 @@ i32 TFetchRequestData::TFetchTopic::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TFetchTopic::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -2179,175 +851,27 @@ TFetchRequestData::TFetchTopic::TFetchPartition::TFetchPartition() , PartitionMaxBytes(PartitionMaxBytesMeta::Default) {} - -class TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchRequestData::TFetchTopic::TFetchPartition& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchRequestData::TFetchTopic::TFetchPartition& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<PartitionMeta> Partition; - NPrivate::TReadStrategy<CurrentLeaderEpochMeta> CurrentLeaderEpoch; - NPrivate::TReadStrategy<FetchOffsetMeta> FetchOffset; - NPrivate::TReadStrategy<LastFetchedEpochMeta> LastFetchedEpoch; - NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset; - NPrivate::TReadStrategy<PartitionMaxBytesMeta> PartitionMaxBytes; -}; - -TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext::TReadContext(TFetchRequestData::TFetchTopic::TFetchPartition& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Partition() - , CurrentLeaderEpoch() - , FetchOffset() - , LastFetchedEpoch() - , LogStartOffset() - , PartitionMaxBytes() -{} - - -TReadDemand TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Partition.Init<NPrivate::ReadFieldRule<PartitionMeta>>(Value.Partition, Version); - } - case 1: { - auto demand = Partition.Next<NPrivate::ReadFieldRule<PartitionMeta>>(Value.Partition, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - CurrentLeaderEpoch.Init<NPrivate::ReadFieldRule<CurrentLeaderEpochMeta>>(Value.CurrentLeaderEpoch, Version); - } - case 3: { - auto demand = CurrentLeaderEpoch.Next<NPrivate::ReadFieldRule<CurrentLeaderEpochMeta>>(Value.CurrentLeaderEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - FetchOffset.Init<NPrivate::ReadFieldRule<FetchOffsetMeta>>(Value.FetchOffset, Version); - } - case 5: { - auto demand = FetchOffset.Next<NPrivate::ReadFieldRule<FetchOffsetMeta>>(Value.FetchOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - LastFetchedEpoch.Init<NPrivate::ReadFieldRule<LastFetchedEpochMeta>>(Value.LastFetchedEpoch, Version); - } - case 7: { - auto demand = LastFetchedEpoch.Next<NPrivate::ReadFieldRule<LastFetchedEpochMeta>>(Value.LastFetchedEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - } - case 9: { - auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - PartitionMaxBytes.Init<NPrivate::ReadFieldRule<PartitionMaxBytesMeta>>(Value.PartitionMaxBytes, Version); - } - case 11: { - auto demand = PartitionMaxBytes.Next<NPrivate::ReadFieldRule<PartitionMaxBytesMeta>>(Value.PartitionMaxBytes, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - if (!NPrivate::VersionCheck<TFetchRequestData::TFetchTopic::TFetchPartition::MessageMeta::FlexibleVersionMin, TFetchRequestData::TFetchTopic::TFetchPartition::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 13: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 15: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 17: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 14; +void TFetchRequestData::TFetchTopic::TFetchPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TFetchTopic::TFetchPartition"; + } + NPrivate::Read<PartitionMeta>(_readable, _version, Partition); + NPrivate::Read<CurrentLeaderEpochMeta>(_readable, _version, CurrentLeaderEpoch); + NPrivate::Read<FetchOffsetMeta>(_readable, _version, FetchOffset); + NPrivate::Read<LastFetchedEpochMeta>(_readable, _version, LastFetchedEpoch); + NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset); + NPrivate::Read<PartitionMaxBytesMeta>(_readable, _version, PartitionMaxBytes); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -2384,9 +908,6 @@ i32 TFetchRequestData::TFetchTopic::TFetchPartition::Size(TKafkaVersion _version } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TFetchTopic::TFetchPartition::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -2400,133 +921,24 @@ TFetchRequestData::TForgottenTopic::TForgottenTopic() , TopicId(TopicIdMeta::Default) {} - -class TFetchRequestData::TForgottenTopic::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchRequestData::TForgottenTopic& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchRequestData::TForgottenTopic& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchRequestData::TForgottenTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TForgottenTopic"; + } + NPrivate::Read<TopicMeta>(_readable, _version, Topic); + NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); - NPrivate::TReadStrategy<TopicMeta> Topic; - NPrivate::TReadStrategy<TopicIdMeta> TopicId; - NPrivate::TReadStrategy<PartitionsMeta> Partitions; -}; - -TFetchRequestData::TForgottenTopic::TReadContext::TReadContext(TFetchRequestData::TForgottenTopic& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Topic() - , TopicId() - , Partitions() -{} - - -TReadDemand TFetchRequestData::TForgottenTopic::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - } - case 1: { - auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - } - case 3: { - auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - } - case 5: { - auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TFetchRequestData::TForgottenTopic::MessageMeta::FlexibleVersionMin, TFetchRequestData::TForgottenTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -2557,9 +969,6 @@ i32 TFetchRequestData::TForgottenTopic::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TForgottenTopic::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -2575,147 +984,25 @@ TFetchResponseData::TFetchResponseData() , SessionId(SessionIdMeta::Default) {} - -class TFetchResponseData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<SessionIdMeta>(_readable, _version, SessionId); + NPrivate::Read<ResponsesMeta>(_readable, _version, Responses); - NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs; - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<SessionIdMeta> SessionId; - NPrivate::TReadStrategy<ResponsesMeta> Responses; -}; - -TFetchResponseData::TReadContext::TReadContext(TFetchResponseData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ThrottleTimeMs() - , ErrorCode() - , SessionId() - , Responses() -{} - - -TReadDemand TFetchResponseData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - } - case 1: { - auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 3: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - SessionId.Init<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version); - } - case 5: { - auto demand = SessionId.Next<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - Responses.Init<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version); - } - case 7: { - auto demand = Responses.Next<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TFetchResponseData::MessageMeta::FlexibleVersionMin, TFetchResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -2748,9 +1035,6 @@ i32 TFetchResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -2764,133 +1048,24 @@ TFetchResponseData::TFetchableTopicResponse::TFetchableTopicResponse() , TopicId(TopicIdMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::TFetchableTopicResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse"; + } + NPrivate::Read<TopicMeta>(_readable, _version, Topic); + NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); - NPrivate::TReadStrategy<TopicMeta> Topic; - NPrivate::TReadStrategy<TopicIdMeta> TopicId; - NPrivate::TReadStrategy<PartitionsMeta> Partitions; -}; - -TFetchResponseData::TFetchableTopicResponse::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Topic() - , TopicId() - , Partitions() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - } - case 1: { - auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - } - case 3: { - auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - } - case 5: { - auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -2921,9 +1096,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::Size(TKafkaVersion _version) co } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -2945,269 +1117,41 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TPartitionData() , PreferredReadReplica(PreferredReadReplicaMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse::TPartitionData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<PartitionIndexMeta> PartitionIndex; - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<HighWatermarkMeta> HighWatermark; - NPrivate::TReadStrategy<LastStableOffsetMeta> LastStableOffset; - NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset; - NPrivate::TReadStrategy<DivergingEpochMeta> DivergingEpoch; - NPrivate::TReadStrategy<CurrentLeaderMeta> CurrentLeader; - NPrivate::TReadStrategy<SnapshotIdMeta> SnapshotId; - NPrivate::TReadStrategy<AbortedTransactionsMeta> AbortedTransactions; - NPrivate::TReadStrategy<PreferredReadReplicaMeta> PreferredReadReplica; - NPrivate::TReadStrategy<RecordsMeta> Records; -}; - -TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , PartitionIndex() - , ErrorCode() - , HighWatermark() - , LastStableOffset() - , LogStartOffset() - , DivergingEpoch() - , CurrentLeader() - , SnapshotId() - , AbortedTransactions() - , PreferredReadReplica() - , Records() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - PartitionIndex.Init<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version); - } - case 1: { - auto demand = PartitionIndex.Next<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 3: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - HighWatermark.Init<NPrivate::ReadFieldRule<HighWatermarkMeta>>(Value.HighWatermark, Version); - } - case 5: { - auto demand = HighWatermark.Next<NPrivate::ReadFieldRule<HighWatermarkMeta>>(Value.HighWatermark, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - LastStableOffset.Init<NPrivate::ReadFieldRule<LastStableOffsetMeta>>(Value.LastStableOffset, Version); - } - case 7: { - auto demand = LastStableOffset.Next<NPrivate::ReadFieldRule<LastStableOffsetMeta>>(Value.LastStableOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - } - case 9: { - auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - DivergingEpoch.Init<NPrivate::ReadFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version); - } - case 11: { - auto demand = DivergingEpoch.Next<NPrivate::ReadFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - ++Step; - CurrentLeader.Init<NPrivate::ReadFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version); - } - case 13: { - auto demand = CurrentLeader.Next<NPrivate::ReadFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - ++Step; - SnapshotId.Init<NPrivate::ReadFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version); - } - case 15: { - auto demand = SnapshotId.Next<NPrivate::ReadFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - ++Step; - AbortedTransactions.Init<NPrivate::ReadFieldRule<AbortedTransactionsMeta>>(Value.AbortedTransactions, Version); - } - case 17: { - auto demand = AbortedTransactions.Next<NPrivate::ReadFieldRule<AbortedTransactionsMeta>>(Value.AbortedTransactions, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 18: { - ++Step; - PreferredReadReplica.Init<NPrivate::ReadFieldRule<PreferredReadReplicaMeta>>(Value.PreferredReadReplica, Version); - } - case 19: { - auto demand = PreferredReadReplica.Next<NPrivate::ReadFieldRule<PreferredReadReplicaMeta>>(Value.PreferredReadReplica, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 20: { - ++Step; - Records.Init<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version); - } - case 21: { - auto demand = Records.Next<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 22: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 23: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 24: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 25: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 26: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 27: { - TReadDemand demand; - switch(Tag_.Value) { - case 0: { - if (!TagInitialized_) { - TagInitialized_=true; - DivergingEpoch.Init<NPrivate::ReadTaggedFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version); - } - demand = DivergingEpoch.Next<NPrivate::ReadTaggedFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version); - break; - } - case 1: { - if (!TagInitialized_) { - TagInitialized_=true; - CurrentLeader.Init<NPrivate::ReadTaggedFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version); - } - demand = CurrentLeader.Next<NPrivate::ReadTaggedFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version); - break; - } - case 2: { - if (!TagInitialized_) { - TagInitialized_=true; - SnapshotId.Init<NPrivate::ReadTaggedFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version); - } - demand = SnapshotId.Next<NPrivate::ReadTaggedFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version); - break; - } - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 24; +void TFetchResponseData::TFetchableTopicResponse::TPartitionData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData"; + } + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<HighWatermarkMeta>(_readable, _version, HighWatermark); + NPrivate::Read<LastStableOffsetMeta>(_readable, _version, LastStableOffset); + NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset); + NPrivate::Read<DivergingEpochMeta>(_readable, _version, DivergingEpoch); + NPrivate::Read<CurrentLeaderMeta>(_readable, _version, CurrentLeader); + NPrivate::Read<SnapshotIdMeta>(_readable, _version, SnapshotId); + NPrivate::Read<AbortedTransactionsMeta>(_readable, _version, AbortedTransactions); + NPrivate::Read<PreferredReadReplicaMeta>(_readable, _version, PreferredReadReplica); + NPrivate::Read<RecordsMeta>(_readable, _version, Records); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + case DivergingEpochMeta::Tag: + NPrivate::ReadTag<DivergingEpochMeta>(_readable, _version, DivergingEpoch); + break; + case CurrentLeaderMeta::Tag: + NPrivate::ReadTag<CurrentLeaderMeta>(_readable, _version, CurrentLeader); + break; + case SnapshotIdMeta::Tag: + NPrivate::ReadTag<SnapshotIdMeta>(_readable, _version, SnapshotId); + break; + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -3257,9 +1201,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::Size(TKafkaVers } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -3273,119 +1214,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TE , EndOffset(EndOffsetMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset"; + } + NPrivate::Read<EpochMeta>(_readable, _version, Epoch); + NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset); - NPrivate::TReadStrategy<EpochMeta> Epoch; - NPrivate::TReadStrategy<EndOffsetMeta> EndOffset; -}; - -TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Epoch() - , EndOffset() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Epoch.Init<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version); - } - case 1: { - auto demand = Epoch.Next<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - EndOffset.Init<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version); - } - case 3: { - auto demand = EndOffset.Next<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -3414,9 +1259,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -3430,119 +1272,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch:: , LeaderEpoch(LeaderEpochMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch"; + } + NPrivate::Read<LeaderIdMeta>(_readable, _version, LeaderId); + NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch); - NPrivate::TReadStrategy<LeaderIdMeta> LeaderId; - NPrivate::TReadStrategy<LeaderEpochMeta> LeaderEpoch; -}; - -TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , LeaderId() - , LeaderEpoch() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - LeaderId.Init<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version); - } - case 1: { - auto demand = LeaderId.Next<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - LeaderEpoch.Init<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version); - } - case 3: { - auto demand = LeaderEpoch.Next<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -3571,9 +1317,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpo } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -3587,119 +1330,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TSnaps , Epoch(EpochMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId"; + } + NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset); + NPrivate::Read<EpochMeta>(_readable, _version, Epoch); - NPrivate::TReadStrategy<EndOffsetMeta> EndOffset; - NPrivate::TReadStrategy<EpochMeta> Epoch; -}; - -TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , EndOffset() - , Epoch() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - EndOffset.Init<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version); - } - case 1: { - auto demand = EndOffset.Next<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Epoch.Init<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version); - } - case 3: { - auto demand = Epoch.Next<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -3728,9 +1375,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::Si } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -3744,119 +1388,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction , FirstOffset(FirstOffsetMeta::Default) {} - -class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction"; + } + NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId); + NPrivate::Read<FirstOffsetMeta>(_readable, _version, FirstOffset); - NPrivate::TReadStrategy<ProducerIdMeta> ProducerId; - NPrivate::TReadStrategy<FirstOffsetMeta> FirstOffset; -}; - -TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ProducerId() - , FirstOffset() -{} - - -TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - } - case 1: { - auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - FirstOffset.Init<NPrivate::ReadFieldRule<FirstOffsetMeta>>(Value.FirstOffset, Version); - } - case 3: { - auto demand = FirstOffset.Next<NPrivate::ReadFieldRule<FirstOffsetMeta>>(Value.FirstOffset, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -3885,9 +1433,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransac } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -3903,147 +1448,25 @@ TMetadataRequestData::TMetadataRequestData() , IncludeTopicAuthorizedOperations(IncludeTopicAuthorizedOperationsMeta::Default) {} - -class TMetadataRequestData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataRequestData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataRequestData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TMetadataRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataRequestData"; + } + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + NPrivate::Read<AllowAutoTopicCreationMeta>(_readable, _version, AllowAutoTopicCreation); + NPrivate::Read<IncludeClusterAuthorizedOperationsMeta>(_readable, _version, IncludeClusterAuthorizedOperations); + NPrivate::Read<IncludeTopicAuthorizedOperationsMeta>(_readable, _version, IncludeTopicAuthorizedOperations); - NPrivate::TReadStrategy<TopicsMeta> Topics; - NPrivate::TReadStrategy<AllowAutoTopicCreationMeta> AllowAutoTopicCreation; - NPrivate::TReadStrategy<IncludeClusterAuthorizedOperationsMeta> IncludeClusterAuthorizedOperations; - NPrivate::TReadStrategy<IncludeTopicAuthorizedOperationsMeta> IncludeTopicAuthorizedOperations; -}; - -TMetadataRequestData::TReadContext::TReadContext(TMetadataRequestData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Topics() - , AllowAutoTopicCreation() - , IncludeClusterAuthorizedOperations() - , IncludeTopicAuthorizedOperations() -{} - - -TReadDemand TMetadataRequestData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - } - case 1: { - auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - AllowAutoTopicCreation.Init<NPrivate::ReadFieldRule<AllowAutoTopicCreationMeta>>(Value.AllowAutoTopicCreation, Version); - } - case 3: { - auto demand = AllowAutoTopicCreation.Next<NPrivate::ReadFieldRule<AllowAutoTopicCreationMeta>>(Value.AllowAutoTopicCreation, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - IncludeClusterAuthorizedOperations.Init<NPrivate::ReadFieldRule<IncludeClusterAuthorizedOperationsMeta>>(Value.IncludeClusterAuthorizedOperations, Version); - } - case 5: { - auto demand = IncludeClusterAuthorizedOperations.Next<NPrivate::ReadFieldRule<IncludeClusterAuthorizedOperationsMeta>>(Value.IncludeClusterAuthorizedOperations, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - IncludeTopicAuthorizedOperations.Init<NPrivate::ReadFieldRule<IncludeTopicAuthorizedOperationsMeta>>(Value.IncludeTopicAuthorizedOperations, Version); - } - case 7: { - auto demand = IncludeTopicAuthorizedOperations.Next<NPrivate::ReadFieldRule<IncludeTopicAuthorizedOperationsMeta>>(Value.IncludeTopicAuthorizedOperations, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TMetadataRequestData::MessageMeta::FlexibleVersionMin, TMetadataRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -4076,9 +1499,6 @@ i32 TMetadataRequestData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataRequestData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -4092,119 +1512,23 @@ TMetadataRequestData::TMetadataRequestTopic::TMetadataRequestTopic() , Name(NameMeta::Default) {} - -class TMetadataRequestData::TMetadataRequestTopic::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataRequestData::TMetadataRequestTopic& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataRequestData::TMetadataRequestTopic& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TMetadataRequestData::TMetadataRequestTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataRequestData::TMetadataRequestTopic"; + } + NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId); + NPrivate::Read<NameMeta>(_readable, _version, Name); - NPrivate::TReadStrategy<TopicIdMeta> TopicId; - NPrivate::TReadStrategy<NameMeta> Name; -}; - -TMetadataRequestData::TMetadataRequestTopic::TReadContext::TReadContext(TMetadataRequestData::TMetadataRequestTopic& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , TopicId() - , Name() -{} - - -TReadDemand TMetadataRequestData::TMetadataRequestTopic::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - } - case 1: { - auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 3: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TMetadataRequestData::TMetadataRequestTopic::MessageMeta::FlexibleVersionMin, TMetadataRequestData::TMetadataRequestTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -4233,9 +1557,6 @@ i32 TMetadataRequestData::TMetadataRequestTopic::Size(TKafkaVersion _version) co } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataRequestData::TMetadataRequestTopic::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -4253,175 +1574,27 @@ TMetadataResponseData::TMetadataResponseData() , ClusterAuthorizedOperations(ClusterAuthorizedOperationsMeta::Default) {} - -class TMetadataResponseData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataResponseData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataResponseData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs; - NPrivate::TReadStrategy<BrokersMeta> Brokers; - NPrivate::TReadStrategy<ClusterIdMeta> ClusterId; - NPrivate::TReadStrategy<ControllerIdMeta> ControllerId; - NPrivate::TReadStrategy<TopicsMeta> Topics; - NPrivate::TReadStrategy<ClusterAuthorizedOperationsMeta> ClusterAuthorizedOperations; -}; - -TMetadataResponseData::TReadContext::TReadContext(TMetadataResponseData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ThrottleTimeMs() - , Brokers() - , ClusterId() - , ControllerId() - , Topics() - , ClusterAuthorizedOperations() -{} - - -TReadDemand TMetadataResponseData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - } - case 1: { - auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Brokers.Init<NPrivate::ReadFieldRule<BrokersMeta>>(Value.Brokers, Version); - } - case 3: { - auto demand = Brokers.Next<NPrivate::ReadFieldRule<BrokersMeta>>(Value.Brokers, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - ClusterId.Init<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - } - case 5: { - auto demand = ClusterId.Next<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - ControllerId.Init<NPrivate::ReadFieldRule<ControllerIdMeta>>(Value.ControllerId, Version); - } - case 7: { - auto demand = ControllerId.Next<NPrivate::ReadFieldRule<ControllerIdMeta>>(Value.ControllerId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - } - case 9: { - auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - ClusterAuthorizedOperations.Init<NPrivate::ReadFieldRule<ClusterAuthorizedOperationsMeta>>(Value.ClusterAuthorizedOperations, Version); - } - case 11: { - auto demand = ClusterAuthorizedOperations.Next<NPrivate::ReadFieldRule<ClusterAuthorizedOperationsMeta>>(Value.ClusterAuthorizedOperations, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - if (!NPrivate::VersionCheck<TMetadataResponseData::MessageMeta::FlexibleVersionMin, TMetadataResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 13: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 15: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 17: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 14; +void TMetadataResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<BrokersMeta>(_readable, _version, Brokers); + NPrivate::Read<ClusterIdMeta>(_readable, _version, ClusterId); + NPrivate::Read<ControllerIdMeta>(_readable, _version, ControllerId); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + NPrivate::Read<ClusterAuthorizedOperationsMeta>(_readable, _version, ClusterAuthorizedOperations); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -4458,9 +1631,6 @@ i32 TMetadataResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -4478,147 +1648,25 @@ TMetadataResponseData::TMetadataResponseBroker::TMetadataResponseBroker() , Rack(RackMeta::Default) {} - -class TMetadataResponseData::TMetadataResponseBroker::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataResponseData::TMetadataResponseBroker& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataResponseData::TMetadataResponseBroker& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TMetadataResponseData::TMetadataResponseBroker::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseBroker"; + } + NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId); + NPrivate::Read<HostMeta>(_readable, _version, Host); + NPrivate::Read<PortMeta>(_readable, _version, Port); + NPrivate::Read<RackMeta>(_readable, _version, Rack); - NPrivate::TReadStrategy<NodeIdMeta> NodeId; - NPrivate::TReadStrategy<HostMeta> Host; - NPrivate::TReadStrategy<PortMeta> Port; - NPrivate::TReadStrategy<RackMeta> Rack; -}; - -TMetadataResponseData::TMetadataResponseBroker::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseBroker& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , NodeId() - , Host() - , Port() - , Rack() -{} - - -TReadDemand TMetadataResponseData::TMetadataResponseBroker::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - NodeId.Init<NPrivate::ReadFieldRule<NodeIdMeta>>(Value.NodeId, Version); - } - case 1: { - auto demand = NodeId.Next<NPrivate::ReadFieldRule<NodeIdMeta>>(Value.NodeId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Host.Init<NPrivate::ReadFieldRule<HostMeta>>(Value.Host, Version); - } - case 3: { - auto demand = Host.Next<NPrivate::ReadFieldRule<HostMeta>>(Value.Host, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - Port.Init<NPrivate::ReadFieldRule<PortMeta>>(Value.Port, Version); - } - case 5: { - auto demand = Port.Next<NPrivate::ReadFieldRule<PortMeta>>(Value.Port, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - Rack.Init<NPrivate::ReadFieldRule<RackMeta>>(Value.Rack, Version); - } - case 7: { - auto demand = Rack.Next<NPrivate::ReadFieldRule<RackMeta>>(Value.Rack, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseBroker::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseBroker::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -4651,9 +1699,6 @@ i32 TMetadataResponseData::TMetadataResponseBroker::Size(TKafkaVersion _version) } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseBroker::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -4673,175 +1718,27 @@ TMetadataResponseData::TMetadataResponseTopic::TMetadataResponseTopic() , TopicAuthorizedOperations(TopicAuthorizedOperationsMeta::Default) {} - -class TMetadataResponseData::TMetadataResponseTopic::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataResponseData::TMetadataResponseTopic& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataResponseData::TMetadataResponseTopic& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<NameMeta> Name; - NPrivate::TReadStrategy<TopicIdMeta> TopicId; - NPrivate::TReadStrategy<IsInternalMeta> IsInternal; - NPrivate::TReadStrategy<PartitionsMeta> Partitions; - NPrivate::TReadStrategy<TopicAuthorizedOperationsMeta> TopicAuthorizedOperations; -}; - -TMetadataResponseData::TMetadataResponseTopic::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseTopic& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ErrorCode() - , Name() - , TopicId() - , IsInternal() - , Partitions() - , TopicAuthorizedOperations() -{} - - -TReadDemand TMetadataResponseData::TMetadataResponseTopic::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 1: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 3: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - } - case 5: { - auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - IsInternal.Init<NPrivate::ReadFieldRule<IsInternalMeta>>(Value.IsInternal, Version); - } - case 7: { - auto demand = IsInternal.Next<NPrivate::ReadFieldRule<IsInternalMeta>>(Value.IsInternal, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - } - case 9: { - auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - TopicAuthorizedOperations.Init<NPrivate::ReadFieldRule<TopicAuthorizedOperationsMeta>>(Value.TopicAuthorizedOperations, Version); - } - case 11: { - auto demand = TopicAuthorizedOperations.Next<NPrivate::ReadFieldRule<TopicAuthorizedOperationsMeta>>(Value.TopicAuthorizedOperations, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseTopic::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 13: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 15: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 17: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 14; +void TMetadataResponseData::TMetadataResponseTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseTopic"; + } + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId); + NPrivate::Read<IsInternalMeta>(_readable, _version, IsInternal); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + NPrivate::Read<TopicAuthorizedOperationsMeta>(_readable, _version, TopicAuthorizedOperations); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -4878,9 +1775,6 @@ i32 TMetadataResponseData::TMetadataResponseTopic::Size(TKafkaVersion _version) } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseTopic::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -4898,189 +1792,28 @@ TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TMeta , LeaderEpoch(LeaderEpochMeta::Default) {} - -class TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<PartitionIndexMeta> PartitionIndex; - NPrivate::TReadStrategy<LeaderIdMeta> LeaderId; - NPrivate::TReadStrategy<LeaderEpochMeta> LeaderEpoch; - NPrivate::TReadStrategy<ReplicaNodesMeta> ReplicaNodes; - NPrivate::TReadStrategy<IsrNodesMeta> IsrNodes; - NPrivate::TReadStrategy<OfflineReplicasMeta> OfflineReplicas; -}; - -TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ErrorCode() - , PartitionIndex() - , LeaderId() - , LeaderEpoch() - , ReplicaNodes() - , IsrNodes() - , OfflineReplicas() -{} - - -TReadDemand TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 1: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - PartitionIndex.Init<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version); - } - case 3: { - auto demand = PartitionIndex.Next<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - LeaderId.Init<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version); - } - case 5: { - auto demand = LeaderId.Next<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - LeaderEpoch.Init<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version); - } - case 7: { - auto demand = LeaderEpoch.Next<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - ReplicaNodes.Init<NPrivate::ReadFieldRule<ReplicaNodesMeta>>(Value.ReplicaNodes, Version); - } - case 9: { - auto demand = ReplicaNodes.Next<NPrivate::ReadFieldRule<ReplicaNodesMeta>>(Value.ReplicaNodes, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - IsrNodes.Init<NPrivate::ReadFieldRule<IsrNodesMeta>>(Value.IsrNodes, Version); - } - case 11: { - auto demand = IsrNodes.Next<NPrivate::ReadFieldRule<IsrNodesMeta>>(Value.IsrNodes, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - ++Step; - OfflineReplicas.Init<NPrivate::ReadFieldRule<OfflineReplicasMeta>>(Value.OfflineReplicas, Version); - } - case 13: { - auto demand = OfflineReplicas.Next<NPrivate::ReadFieldRule<OfflineReplicasMeta>>(Value.OfflineReplicas, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 15: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 17: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 18: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 19: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 16; +void TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition"; + } + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<LeaderIdMeta>(_readable, _version, LeaderId); + NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch); + NPrivate::Read<ReplicaNodesMeta>(_readable, _version, ReplicaNodes); + NPrivate::Read<IsrNodesMeta>(_readable, _version, IsrNodes); + NPrivate::Read<OfflineReplicasMeta>(_readable, _version, OfflineReplicas); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -5119,9 +1852,6 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -5135,119 +1865,23 @@ TApiVersionsRequestData::TApiVersionsRequestData() , ClientSoftwareVersion(ClientSoftwareVersionMeta::Default) {} - -class TApiVersionsRequestData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TApiVersionsRequestData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TApiVersionsRequestData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TApiVersionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TApiVersionsRequestData"; + } + NPrivate::Read<ClientSoftwareNameMeta>(_readable, _version, ClientSoftwareName); + NPrivate::Read<ClientSoftwareVersionMeta>(_readable, _version, ClientSoftwareVersion); - NPrivate::TReadStrategy<ClientSoftwareNameMeta> ClientSoftwareName; - NPrivate::TReadStrategy<ClientSoftwareVersionMeta> ClientSoftwareVersion; -}; - -TApiVersionsRequestData::TReadContext::TReadContext(TApiVersionsRequestData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ClientSoftwareName() - , ClientSoftwareVersion() -{} - - -TReadDemand TApiVersionsRequestData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ClientSoftwareName.Init<NPrivate::ReadFieldRule<ClientSoftwareNameMeta>>(Value.ClientSoftwareName, Version); - } - case 1: { - auto demand = ClientSoftwareName.Next<NPrivate::ReadFieldRule<ClientSoftwareNameMeta>>(Value.ClientSoftwareName, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ClientSoftwareVersion.Init<NPrivate::ReadFieldRule<ClientSoftwareVersionMeta>>(Value.ClientSoftwareVersion, Version); - } - case 3: { - auto demand = ClientSoftwareVersion.Next<NPrivate::ReadFieldRule<ClientSoftwareVersionMeta>>(Value.ClientSoftwareVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - if (!NPrivate::VersionCheck<TApiVersionsRequestData::MessageMeta::FlexibleVersionMin, TApiVersionsRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 5: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 7: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 9: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 6; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -5276,9 +1910,6 @@ i32 TApiVersionsRequestData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TApiVersionsRequestData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -5296,221 +1927,40 @@ TApiVersionsResponseData::TApiVersionsResponseData() , ZkMigrationReady(ZkMigrationReadyMeta::Default) {} - -class TApiVersionsResponseData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TApiVersionsResponseData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TApiVersionsResponseData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; - - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<ApiKeysMeta> ApiKeys; - NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs; - NPrivate::TReadStrategy<SupportedFeaturesMeta> SupportedFeatures; - NPrivate::TReadStrategy<FinalizedFeaturesEpochMeta> FinalizedFeaturesEpoch; - NPrivate::TReadStrategy<FinalizedFeaturesMeta> FinalizedFeatures; - NPrivate::TReadStrategy<ZkMigrationReadyMeta> ZkMigrationReady; -}; - -TApiVersionsResponseData::TReadContext::TReadContext(TApiVersionsResponseData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ErrorCode() - , ApiKeys() - , ThrottleTimeMs() - , SupportedFeatures() - , FinalizedFeaturesEpoch() - , FinalizedFeatures() - , ZkMigrationReady() -{} - - -TReadDemand TApiVersionsResponseData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 1: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ApiKeys.Init<NPrivate::ReadFieldRule<ApiKeysMeta>>(Value.ApiKeys, Version); - } - case 3: { - auto demand = ApiKeys.Next<NPrivate::ReadFieldRule<ApiKeysMeta>>(Value.ApiKeys, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - } - case 5: { - auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - SupportedFeatures.Init<NPrivate::ReadFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version); - } - case 7: { - auto demand = SupportedFeatures.Next<NPrivate::ReadFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - FinalizedFeaturesEpoch.Init<NPrivate::ReadFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version); - } - case 9: { - auto demand = FinalizedFeaturesEpoch.Next<NPrivate::ReadFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - FinalizedFeatures.Init<NPrivate::ReadFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version); - } - case 11: { - auto demand = FinalizedFeatures.Next<NPrivate::ReadFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - ++Step; - ZkMigrationReady.Init<NPrivate::ReadFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version); - } - case 13: { - auto demand = ZkMigrationReady.Next<NPrivate::ReadFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 14: { - if (!NPrivate::VersionCheck<TApiVersionsResponseData::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 15: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 16: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 17: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 18: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 19: { - TReadDemand demand; - switch(Tag_.Value) { - case 0: { - if (!TagInitialized_) { - TagInitialized_=true; - SupportedFeatures.Init<NPrivate::ReadTaggedFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version); - } - demand = SupportedFeatures.Next<NPrivate::ReadTaggedFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version); - break; - } - case 1: { - if (!TagInitialized_) { - TagInitialized_=true; - FinalizedFeaturesEpoch.Init<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version); - } - demand = FinalizedFeaturesEpoch.Next<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version); - break; - } - case 2: { - if (!TagInitialized_) { - TagInitialized_=true; - FinalizedFeatures.Init<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version); - } - demand = FinalizedFeatures.Next<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version); - break; - } - case 3: { - if (!TagInitialized_) { - TagInitialized_=true; - ZkMigrationReady.Init<NPrivate::ReadTaggedFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version); - } - demand = ZkMigrationReady.Next<NPrivate::ReadTaggedFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version); - break; - } - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 16; +void TApiVersionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData"; + } + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ApiKeysMeta>(_readable, _version, ApiKeys); + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures); + NPrivate::Read<FinalizedFeaturesEpochMeta>(_readable, _version, FinalizedFeaturesEpoch); + NPrivate::Read<FinalizedFeaturesMeta>(_readable, _version, FinalizedFeatures); + NPrivate::Read<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + case SupportedFeaturesMeta::Tag: + NPrivate::ReadTag<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures); + break; + case FinalizedFeaturesEpochMeta::Tag: + NPrivate::ReadTag<FinalizedFeaturesEpochMeta>(_readable, _version, FinalizedFeaturesEpoch); + break; + case FinalizedFeaturesMeta::Tag: + NPrivate::ReadTag<FinalizedFeaturesMeta>(_readable, _version, FinalizedFeatures); + break; + case ZkMigrationReadyMeta::Tag: + NPrivate::ReadTag<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady); + break; + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -5553,9 +2003,6 @@ i32 TApiVersionsResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -5571,133 +2018,24 @@ TApiVersionsResponseData::TApiVersion::TApiVersion() , MaxVersion(MaxVersionMeta::Default) {} - -class TApiVersionsResponseData::TApiVersion::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TApiVersionsResponseData::TApiVersion& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TApiVersionsResponseData::TApiVersion& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TApiVersionsResponseData::TApiVersion::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TApiVersion"; + } + NPrivate::Read<ApiKeyMeta>(_readable, _version, ApiKey); + NPrivate::Read<MinVersionMeta>(_readable, _version, MinVersion); + NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion); - NPrivate::TReadStrategy<ApiKeyMeta> ApiKey; - NPrivate::TReadStrategy<MinVersionMeta> MinVersion; - NPrivate::TReadStrategy<MaxVersionMeta> MaxVersion; -}; - -TApiVersionsResponseData::TApiVersion::TReadContext::TReadContext(TApiVersionsResponseData::TApiVersion& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ApiKey() - , MinVersion() - , MaxVersion() -{} - - -TReadDemand TApiVersionsResponseData::TApiVersion::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ApiKey.Init<NPrivate::ReadFieldRule<ApiKeyMeta>>(Value.ApiKey, Version); - } - case 1: { - auto demand = ApiKey.Next<NPrivate::ReadFieldRule<ApiKeyMeta>>(Value.ApiKey, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - MinVersion.Init<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version); - } - case 3: { - auto demand = MinVersion.Next<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - MaxVersion.Init<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version); - } - case 5: { - auto demand = MaxVersion.Next<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TApiVersionsResponseData::TApiVersion::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TApiVersion::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -5728,9 +2066,6 @@ i32 TApiVersionsResponseData::TApiVersion::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TApiVersion::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -5746,133 +2081,24 @@ TApiVersionsResponseData::TSupportedFeatureKey::TSupportedFeatureKey() , MaxVersion(MaxVersionMeta::Default) {} - -class TApiVersionsResponseData::TSupportedFeatureKey::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TApiVersionsResponseData::TSupportedFeatureKey& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TApiVersionsResponseData::TSupportedFeatureKey& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TApiVersionsResponseData::TSupportedFeatureKey::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TSupportedFeatureKey"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<MinVersionMeta>(_readable, _version, MinVersion); + NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion); - NPrivate::TReadStrategy<NameMeta> Name; - NPrivate::TReadStrategy<MinVersionMeta> MinVersion; - NPrivate::TReadStrategy<MaxVersionMeta> MaxVersion; -}; - -TApiVersionsResponseData::TSupportedFeatureKey::TReadContext::TReadContext(TApiVersionsResponseData::TSupportedFeatureKey& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Name() - , MinVersion() - , MaxVersion() -{} - - -TReadDemand TApiVersionsResponseData::TSupportedFeatureKey::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 1: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - MinVersion.Init<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version); - } - case 3: { - auto demand = MinVersion.Next<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - MaxVersion.Init<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version); - } - case 5: { - auto demand = MaxVersion.Next<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TApiVersionsResponseData::TSupportedFeatureKey::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TSupportedFeatureKey::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -5903,9 +2129,6 @@ i32 TApiVersionsResponseData::TSupportedFeatureKey::Size(TKafkaVersion _version) } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TSupportedFeatureKey::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -5921,133 +2144,24 @@ TApiVersionsResponseData::TFinalizedFeatureKey::TFinalizedFeatureKey() , MinVersionLevel(MinVersionLevelMeta::Default) {} - -class TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TApiVersionsResponseData::TFinalizedFeatureKey& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TApiVersionsResponseData::TFinalizedFeatureKey& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TApiVersionsResponseData::TFinalizedFeatureKey::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TFinalizedFeatureKey"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<MaxVersionLevelMeta>(_readable, _version, MaxVersionLevel); + NPrivate::Read<MinVersionLevelMeta>(_readable, _version, MinVersionLevel); - NPrivate::TReadStrategy<NameMeta> Name; - NPrivate::TReadStrategy<MaxVersionLevelMeta> MaxVersionLevel; - NPrivate::TReadStrategy<MinVersionLevelMeta> MinVersionLevel; -}; - -TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext::TReadContext(TApiVersionsResponseData::TFinalizedFeatureKey& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , Name() - , MaxVersionLevel() - , MinVersionLevel() -{} - - -TReadDemand TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - } - case 1: { - auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - MaxVersionLevel.Init<NPrivate::ReadFieldRule<MaxVersionLevelMeta>>(Value.MaxVersionLevel, Version); - } - case 3: { - auto demand = MaxVersionLevel.Next<NPrivate::ReadFieldRule<MaxVersionLevelMeta>>(Value.MaxVersionLevel, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - MinVersionLevel.Init<NPrivate::ReadFieldRule<MinVersionLevelMeta>>(Value.MinVersionLevel, Version); - } - case 5: { - auto demand = MinVersionLevel.Next<NPrivate::ReadFieldRule<MinVersionLevelMeta>>(Value.MinVersionLevel, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - if (!NPrivate::VersionCheck<TApiVersionsResponseData::TFinalizedFeatureKey::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TFinalizedFeatureKey::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 7: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 9: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 11: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 8; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -6078,9 +2192,6 @@ i32 TApiVersionsResponseData::TFinalizedFeatureKey::Size(TKafkaVersion _version) } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TFinalizedFeatureKey::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -6098,147 +2209,25 @@ TInitProducerIdRequestData::TInitProducerIdRequestData() , ProducerEpoch(ProducerEpochMeta::Default) {} - -class TInitProducerIdRequestData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TInitProducerIdRequestData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TInitProducerIdRequestData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TInitProducerIdRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TInitProducerIdRequestData"; + } + NPrivate::Read<TransactionalIdMeta>(_readable, _version, TransactionalId); + NPrivate::Read<TransactionTimeoutMsMeta>(_readable, _version, TransactionTimeoutMs); + NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId); + NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch); - NPrivate::TReadStrategy<TransactionalIdMeta> TransactionalId; - NPrivate::TReadStrategy<TransactionTimeoutMsMeta> TransactionTimeoutMs; - NPrivate::TReadStrategy<ProducerIdMeta> ProducerId; - NPrivate::TReadStrategy<ProducerEpochMeta> ProducerEpoch; -}; - -TInitProducerIdRequestData::TReadContext::TReadContext(TInitProducerIdRequestData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , TransactionalId() - , TransactionTimeoutMs() - , ProducerId() - , ProducerEpoch() -{} - - -TReadDemand TInitProducerIdRequestData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - TransactionalId.Init<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version); - } - case 1: { - auto demand = TransactionalId.Next<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - TransactionTimeoutMs.Init<NPrivate::ReadFieldRule<TransactionTimeoutMsMeta>>(Value.TransactionTimeoutMs, Version); - } - case 3: { - auto demand = TransactionTimeoutMs.Next<NPrivate::ReadFieldRule<TransactionTimeoutMsMeta>>(Value.TransactionTimeoutMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - } - case 5: { - auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - ProducerEpoch.Init<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version); - } - case 7: { - auto demand = ProducerEpoch.Next<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TInitProducerIdRequestData::MessageMeta::FlexibleVersionMin, TInitProducerIdRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -6271,9 +2260,6 @@ i32 TInitProducerIdRequestData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TInitProducerIdRequestData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} // @@ -6291,147 +2277,25 @@ TInitProducerIdResponseData::TInitProducerIdResponseData() , ProducerEpoch(ProducerEpochMeta::Default) {} - -class TInitProducerIdResponseData::TReadContext : public NKafka::TReadContext { -public: - TReadContext(TInitProducerIdResponseData& value, TKafkaVersion version); - TReadDemand Next() override; -private: - TInitProducerIdResponseData& Value; - TKafkaVersion Version; - size_t Step; - - NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_; - NPrivate::ReadUnsignedVarintStrategy Tag_; - NPrivate::ReadUnsignedVarintStrategy TagSize_; - bool TagInitialized_; +void TInitProducerIdResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TInitProducerIdResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId); + NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch); - NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs; - NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode; - NPrivate::TReadStrategy<ProducerIdMeta> ProducerId; - NPrivate::TReadStrategy<ProducerEpochMeta> ProducerEpoch; -}; - -TInitProducerIdResponseData::TReadContext::TReadContext(TInitProducerIdResponseData& value, TKafkaVersion version) - : Value(value) - , Version(version) - , Step(0) - , ThrottleTimeMs() - , ErrorCode() - , ProducerId() - , ProducerEpoch() -{} - - -TReadDemand TInitProducerIdResponseData::TReadContext::Next() { - while(true) { - switch(Step) { - case 0: { - ++Step; - ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - } - case 1: { - auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 2: { - ++Step; - ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - } - case 3: { - auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 4: { - ++Step; - ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - } - case 5: { - auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 6: { - ++Step; - ProducerEpoch.Init<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version); - } - case 7: { - auto demand = ProducerEpoch.Next<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 8: { - if (!NPrivate::VersionCheck<TInitProducerIdResponseData::MessageMeta::FlexibleVersionMin, TInitProducerIdResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand; - ++Step; - NumTaggedFields_.Init(); - } - case 9: { - auto demand = NumTaggedFields_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 10: { - ++Step; - if (NumTaggedFields_.Value <= 0) return NoDemand; - --NumTaggedFields_.Value; - Tag_.Init(); - TagSize_.Init(); - TagInitialized_=false; - } - case 11: { - auto demand = Tag_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 12: { - auto demand = TagSize_.Next(); - if (demand) { - return demand; - } else { - ++Step; - } - } - case 13: { - TReadDemand demand; - switch(Tag_.Value) { - default: { - if (!TagInitialized_) { - TagInitialized_ = true; - demand = TReadDemand(TagSize_.Value); - } else { - demand = NoDemand; - } - } - } - if (demand) { - return demand; - } else { - Step = 10; + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag break; - } } - default: - return NoDemand; } } } @@ -6464,7 +2328,4 @@ i32 TInitProducerIdResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } -std::unique_ptr<NKafka::TReadContext> TInitProducerIdResponseData::CreateReadContext(TKafkaVersion _version) { - return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version)); -} } //namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index c070177e7f..4ca58fb3cd 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -116,11 +116,9 @@ public: }; ClientIdMeta::Type ClientId; - class TReadContext; - i16 ApiKey() const override { return HEADER; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TRequestHeaderData& other) const = default; @@ -158,11 +156,9 @@ public: }; CorrelationIdMeta::Type CorrelationId; - class TReadContext; - i16 ApiKey() const override { return HEADER; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TResponseHeaderData& other) const = default; @@ -242,10 +238,8 @@ public: }; RecordsMeta::Type Records; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TPartitionProduceData& other) const = default; @@ -290,10 +284,8 @@ public: }; PartitionDataMeta::Type PartitionData; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TTopicProduceData& other) const = default; @@ -376,11 +368,9 @@ public: }; TopicDataMeta::Type TopicData; - class TReadContext; - i16 ApiKey() const override { return PRODUCE; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TProduceRequestData& other) const = default; @@ -473,10 +463,8 @@ public: }; BatchIndexErrorMessageMeta::Type BatchIndexErrorMessage; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TBatchIndexAndErrorMessage& other) const = default; @@ -616,10 +604,8 @@ public: }; ErrorMessageMeta::Type ErrorMessage; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TPartitionProduceResponse& other) const = default; @@ -664,10 +650,8 @@ public: }; PartitionResponsesMeta::Type PartitionResponses; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TTopicProduceResponse& other) const = default; @@ -712,11 +696,9 @@ public: }; ThrottleTimeMsMeta::Type ThrottleTimeMs; - class TReadContext; - i16 ApiKey() const override { return PRODUCE; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TProduceResponseData& other) const = default; @@ -873,10 +855,8 @@ public: }; PartitionMaxBytesMeta::Type PartitionMaxBytes; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFetchPartition& other) const = default; @@ -940,10 +920,8 @@ public: }; PartitionsMeta::Type Partitions; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFetchTopic& other) const = default; @@ -1019,10 +997,8 @@ public: }; PartitionsMeta::Type Partitions; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TForgottenTopic& other) const = default; @@ -1240,11 +1216,9 @@ public: }; RackIdMeta::Type RackId; - class TReadContext; - i16 ApiKey() const override { return FETCH; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFetchRequestData& other) const = default; @@ -1337,10 +1311,8 @@ public: }; EndOffsetMeta::Type EndOffset; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TEpochEndOffset& other) const = default; @@ -1396,10 +1368,8 @@ public: }; LeaderEpochMeta::Type LeaderEpoch; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TLeaderIdAndEpoch& other) const = default; @@ -1455,10 +1425,8 @@ public: }; EpochMeta::Type Epoch; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TSnapshotId& other) const = default; @@ -1514,10 +1482,8 @@ public: }; FirstOffsetMeta::Type FirstOffset; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TAbortedTransaction& other) const = default; @@ -1732,10 +1698,8 @@ public: }; RecordsMeta::Type Records; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TPartitionData& other) const = default; @@ -1799,10 +1763,8 @@ public: }; PartitionsMeta::Type Partitions; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFetchableTopicResponse& other) const = default; @@ -1885,11 +1847,9 @@ public: }; ResponsesMeta::Type Responses; - class TReadContext; - i16 ApiKey() const override { return FETCH; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFetchResponseData& other) const = default; @@ -1958,10 +1918,8 @@ public: }; NameMeta::Type Name; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataRequestTopic& other) const = default; @@ -2044,11 +2002,9 @@ public: }; IncludeTopicAuthorizedOperationsMeta::Type IncludeTopicAuthorizedOperations; - class TReadContext; - i16 ApiKey() const override { return METADATA; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataRequestData& other) const = default; @@ -2155,10 +2111,8 @@ public: }; RackMeta::Type Rack; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataResponseBroker& other) const = default; @@ -2324,10 +2278,8 @@ public: }; OfflineReplicasMeta::Type OfflineReplicas; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataResponsePartition& other) const = default; @@ -2448,10 +2400,8 @@ public: }; TopicAuthorizedOperationsMeta::Type TopicAuthorizedOperations; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataResponseTopic& other) const = default; @@ -2573,11 +2523,9 @@ public: }; ClusterAuthorizedOperationsMeta::Type ClusterAuthorizedOperations; - class TReadContext; - i16 ApiKey() const override { return METADATA; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TMetadataResponseData& other) const = default; @@ -2634,11 +2582,9 @@ public: }; ClientSoftwareVersionMeta::Type ClientSoftwareVersion; - class TReadContext; - i16 ApiKey() const override { return API_VERSIONS; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TApiVersionsRequestData& other) const = default; @@ -2726,10 +2672,8 @@ public: }; MaxVersionMeta::Type MaxVersion; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TApiVersion& other) const = default; @@ -2804,10 +2748,8 @@ public: }; MaxVersionMeta::Type MaxVersion; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TSupportedFeatureKey& other) const = default; @@ -2882,10 +2824,8 @@ public: }; MinVersionLevelMeta::Type MinVersionLevel; - class TReadContext; - i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TFinalizedFeatureKey& other) const = default; @@ -3031,11 +2971,9 @@ public: }; ZkMigrationReadyMeta::Type ZkMigrationReady; - class TReadContext; - i16 ApiKey() const override { return API_VERSIONS; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TApiVersionsResponseData& other) const = default; @@ -3130,11 +3068,9 @@ public: }; ProducerEpochMeta::Type ProducerEpoch; - class TReadContext; - i16 ApiKey() const override { return INIT_PRODUCER_ID; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TInitProducerIdRequestData& other) const = default; @@ -3229,11 +3165,9 @@ public: }; ProducerEpochMeta::Type ProducerEpoch; - class TReadContext; - i16 ApiKey() const override { return INIT_PRODUCER_ID; }; i32 Size(TKafkaVersion version) const override; - std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; void Write(TKafkaWritable& writable, TKafkaVersion version) const override; bool operator==(const TInitProducerIdResponseData& other) const = default; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp index 9604fdaa76..01ef8ba9b8 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.cpp +++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp @@ -2,19 +2,13 @@ namespace NKafka { -void ErrorOnUnexpectedEnd(std::istream& is) { - if (is.eof()) { - ythrow yexception() << "unexpected end of stream"; - } -} - TKafkaWritable& TKafkaWritable::operator<<(const TKafkaRawBytes& val) { - Os.write(val.data(), val.size()); + write(val.data(), val.size()); return *this; } TKafkaWritable& TKafkaWritable::operator<<(const TKafkaRawString& val) { - Os.write(val.data(), val.length()); + write(val.data(), val.length()); return *this; } @@ -28,10 +22,15 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) { void TKafkaWritable::writeUnsignedVarint(TKafkaUint32 value) { while ((value & 0xffffff80) != 0L) { ui8 b = (ui8) ((value & 0x7f) | 0x80); - Os << b; + write((const char*)&b, sizeof(b)); value >>= 7; } - Os << (ui8) value; + ui8 b = (ui8) value; + write((const char*)&b, sizeof(b)); +} + +void TKafkaWritable::write(const char* val, size_t length) { + Buffer.write(val, length); } TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) { @@ -45,17 +44,30 @@ TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) { } -void TKafkaReadable::read(char* val, int length) { - Is.read(val, length); - ErrorOnUnexpectedEnd(Is); +void TKafkaReadable::read(char* val, size_t length) { + checkEof(length); + memcpy(val, Is.Data() + Position, length); + Position += length; +} + +char TKafkaReadable::get() { + char r; + read(&r, sizeof(r)); + return r; +} + +TArrayRef<const char> TKafkaReadable::Bytes(size_t length) { + checkEof(length); + TArrayRef<const char> r(Is.Data() + Position, length); + Position += length; + return r; } ui32 TKafkaReadable::readUnsignedVarint() { ui32 value = 0; ui32 i = 0; ui16 b; - while (((b = Is.get()) & 0x80) != 0) { - ErrorOnUnexpectedEnd(Is); + while (((b = get()) & 0x80) != 0) { value |= ((ui32)(b & 0x7f)) << i; i += 7; @@ -64,21 +76,19 @@ ui32 TKafkaReadable::readUnsignedVarint() { } } - ErrorOnUnexpectedEnd(Is); - value |= b << i; return value; } -void TKafkaReadable::skip(int length) { - char buffer[64]; - while (length) { - int l = std::min(length, 64); - Is.read(buffer, l); - length -= l; - } +void TKafkaReadable::skip(size_t length) { + checkEof(length); + Position += length; +} - ErrorOnUnexpectedEnd(Is); +void TKafkaReadable::checkEof(size_t length) { + if (Position + length > Is.Size()) { + ythrow yexception() << "unexpected end of stream"; + } } } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 3f02644272..fe0bbb3a86 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -234,337 +234,6 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version } -template<typename T> -void NormalizeNumber(T& value) { -#ifndef WORDS_BIGENDIAN - char* b = (char*)&value; - char* e = b + sizeof(T) - 1; - while(b < e) { - std::swap(*b, *e); - ++b; - --e; - } -#endif -} - -class ReadUnsignedVarintStrategy { -public: - void Init() { - Finished = 0; - Shift = -1; - Value = 0; - } - - TReadDemand Next() { - if (Finished) { - return NoDemand; - } - if (Shift >= 0) { - Finished = !(Buffer & 0x80); - ui32 v = Buffer & 0x7F; - Value |= v << Shift; - Shift += 7; - } else { - Shift = 0; - } - - if (Finished) { - return NoDemand; - } else { - return TReadDemand(&Buffer, sizeof(char)); - } - } - - TKafkaInt32 Value; - -private: - bool Finished; - char Buffer; - i8 Shift; -}; - - - -template<typename Meta, typename TOldSizeType> -class ReadSizeStrategy { -public: - void Init() { - WasRead = false; - WasNormalized = false; - OldValue = 0; - UnsignedVarintStrategy.Init(); - } - - TReadDemand Next(TKafkaVersion version) { - if (VersionCheck<Meta::FlexibleVersionMin, Meta::FlexibleVersionMax>(version)) { - return UnsignedVarintStrategy.Next(); - } else { - if (WasRead) { - if (!WasNormalized) { - WasNormalized = true; - NormalizeNumber(OldValue); - } - return NoDemand; - } - WasRead = true; - return TReadDemand((char*)&OldValue, sizeof(TOldSizeType)); - } - } - - TKafkaInt32 Value(TKafkaVersion version) { - if (VersionCheck<Meta::FlexibleVersionMin, Meta::FlexibleVersionMax>(version)) { - return UnsignedVarintStrategy.Value - 1; - } else { - return OldValue; - } - } - -private: - bool WasRead; - bool WasNormalized; - TOldSizeType OldValue; - - ReadUnsignedVarintStrategy UnsignedVarintStrategy; -}; - - - -template<typename Meta, - typename TValueType = typename Meta::Type, - typename TTypeDesc = typename Meta::TypeDesc> -class TReadStrategy { -public: - template<typename Rule> - void Init(TValueType& /*value*/, TKafkaVersion /*version*/) { - WasRead = false; - } - - template<typename Rule> - TReadDemand Next(TValueType& value, TKafkaVersion version) { - if (!Rule::Apply(version)) { - if constexpr (Meta::TypeDesc::Default) { - value = Meta::Default; - } - return NoDemand; - } - if (WasRead) { - NormalizeNumber(value); - return NoDemand; - } - WasRead = true; - return TReadDemand((char*)&value, sizeof(TValueType)); - } - -private: - bool WasRead; -}; - - -template<typename Meta, typename TValueType> -class TReadStrategy<Meta, TValueType, TKafkaStructDesc> { -public: - template<typename Rule> - void Init(TValueType& value, TKafkaVersion version) { - if (Rule::Apply(version)) { - Context = value.CreateReadContext(version); - } - } - - template<typename Rule> - TReadDemand Next(TValueType& /*value*/, TKafkaVersion version) { - if (Rule::Apply(version)) { - return Context.get()->Next(); - } else { - return NoDemand; - } - } - -private: - std::unique_ptr<TReadContext> Context; -}; - - -template<typename Meta, typename TValueType> -class TReadStrategy<Meta, TValueType, TKafkaUuidDesc> { -public: - template<typename Rule> - void Init(TKafkaUuid& /*value*/, TKafkaVersion /*version*/) { - WasRead = false; - } - - template<typename Rule> - TReadDemand Next(TKafkaUuid& value, TKafkaVersion version) { - if (!Rule::Apply(version)) { - return NoDemand; - } - if (WasRead) { - NormalizeNumber(Buffer[0]); - NormalizeNumber(Buffer[1]); - value = TKafkaUuid(Buffer[0], Buffer[1]); - return NoDemand; - } - WasRead = true; - return TReadDemand((char*)Buffer, sizeof(ui64) << 1); - } - -private: - bool WasRead; - ui64 Buffer[2]; -}; - - - -template<typename Meta, typename TValueType> -class TReadStrategy<Meta, TValueType, TKafkaStringDesc> { -public: - template<typename Rule> - void Init(TKafkaString& /*value*/, TKafkaVersion /*version*/) { - WasRead = false; - Size.Init(); - } - - template<typename Rule> - TReadDemand Next(TKafkaString& value, TKafkaVersion version) { - if (WasRead || !Rule::Apply(version)) { - return NoDemand; - } - auto demand = Size.Next(version); - if (demand) { - return demand; - } - WasRead = true; - TKafkaInt32 length = Size.Value(version); - - if (length < 0) { - if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) { - value = std::nullopt; - return NoDemand; - } else { - ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null"; - } - } else if (length > Max<i16>()){ - ythrow yexception() << "string field " << Meta::Name << " had invalid length " << length; - } - - value = TString(); - value->ReserveAndResize(length); - return TReadDemand((char*)value->data(), length); - } - -private: - bool WasRead; - - ReadSizeStrategy<Meta, TKafkaInt16> Size; -}; - - -template<typename Meta, typename Desc> -class TReadStrategy<Meta, TKafkaRecords, Desc> { -public: - template<typename Rule> - void Init(TKafkaRecords& /*value*/, TKafkaVersion /*version*/) { - WasRead = false; - Size.Init(); - } - - template<typename Rule> - TReadDemand Next(TKafkaRecords& value, TKafkaVersion version) { - if (WasRead || !Rule::Apply(version)) { - return NoDemand; - } - auto demand = Size.Next(version); - if (demand) { - return demand; - } - WasRead = true; - TKafkaInt32 length = Size.Value(version); - - if (length < 0) { - if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) { - value = std::nullopt; - return NoDemand; - } else { - ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null"; - } - } else if (length > MAX_RECORDS_SIZE) { - ythrow yexception() << "records fields " << Meta::Name << " has invalid length " << length; - } - - value = TKafkaRawBytes(); - value->Resize(length); - return TReadDemand(value->data(), length); - } - -private: - bool WasRead; - - ReadSizeStrategy<Meta, TKafkaInt32> Size; -}; - - -template<typename Meta, typename TValueType> -class TReadStrategy<Meta, TValueType, TKafkaArrayDesc> { -public: - template<typename Rule> - void Init(std::vector<typename Meta::ItemType>& /*value*/, TKafkaVersion /*version*/) { - ItemStep = -1; - Size.Init(); - } - - template<typename Rule> - TReadDemand Next(std::vector<typename Meta::ItemType>& value, TKafkaVersion version) { - if (!Rule::Apply(version)) { - return NoDemand; - } - auto demand = Size.Next(version); - if (demand) { - return demand; - } - if (-1 == ItemStep) { - TKafkaInt32 length = Size.Value(version); - - if (length < 0) { - if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) { - value.resize(0); - return NoDemand; - } else { - ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null"; - } - } - - value.resize(length); - if (0 == length) { - return NoDemand; - } - - ItemStep = 0; - ItemStrategy.template Init<Rule>(value[ItemStep], version); - } else if (ItemStep == (i32)value.size()) { - return NoDemand; - } - - demand = ItemStrategy.template Next<Rule>(value[ItemStep], version); - if (demand) { - return demand; - } - - ++ItemStep; - if (ItemStep == (i32)value.size()) { - return NoDemand; - } - ItemStrategy.template Init<Rule>(value[ItemStep], version); - return ItemStrategy.template Next<Rule>(value[ItemStep], version); - } - -private: - i32 ItemStep; - - ReadSizeStrategy<Meta, TKafkaInt32> Size; - TReadStrategy<Meta, typename Meta::ItemType, typename Meta::ItemTypeDesc> ItemStrategy; -}; - - - // @@ -712,9 +381,7 @@ public: ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null"; } } else { - value = TBuffer(); - value->Resize(length); - readable.read(value->data(), length); + value = readable.Bytes(length); } } diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp index ed8185637a..eeffb3d604 100644 --- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp +++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp @@ -8,86 +8,11 @@ using namespace NKafka; void Print(std::string& sb); -class TReadProcessor { -public: - TReadProcessor(std::stringstream& buffer) - : Buffer(std::istreambuf_iterator<char>(buffer), {}) - , Position(0) - { - Print(Buffer); - } - - std::string Buffer; - size_t Position; - - void Read(TMessage* msg, TKafkaVersion version) { - auto ctx = msg->CreateReadContext(version); - - - while(true) { - auto demand = ctx->Next(); - Cerr << "TReadProcessor:: demand length=" << demand.GetLength() << ", position=" << Position << ", length=" << Buffer.length() << Endl; - if (!demand) { - break; - } - - if (!(Buffer.length() >= Position + demand.GetLength())) { - EXPECT_TRUE(Buffer.length() >= Position + demand.GetLength()); - return; - } - - if (!demand.Skip()) { - memcpy(demand.GetBuffer(), Buffer.data() + Position, demand.GetLength()); - } - Position += demand.Length; - } - - EXPECT_FALSE(Position < Buffer.length()); - } -}; - -template<typename Meta> -class TFieldReadProcessor { -public: - NKafka::NPrivate::TReadStrategy<Meta> strategy; - - TFieldReadProcessor(std::stringstream& buffer) - : Buffer(std::istreambuf_iterator<char>(buffer), {}) - , Position(0) - { - Print(Buffer); - } - - std::string Buffer; - size_t Position; - - void Read(typename Meta::Type& field, TKafkaVersion version) { - strategy.template Init<NKafka::NPrivate::ReadFieldRule<Meta>>(field, version); +static constexpr size_t BUFFER_SIZE = 1 << 16; - while(true) { - auto demand = strategy.template Next<NKafka::NPrivate::ReadFieldRule<Meta>>(field, version); - Cerr << "TFieldReadProcessor:: demand length=" << demand.GetLength() << ", position=" << Position << ", length=" << Buffer.length() << Endl; - if (!demand) { - break; - } - - if (!(Buffer.length() >= Position + demand.GetLength())) { - EXPECT_TRUE(Buffer.length() >= Position + demand.GetLength()); - return; - } - - if (!demand.Skip()) { - memcpy(demand.GetBuffer(), Buffer.data() + Position, demand.GetLength()); - } - Position += demand.Length; - } - - EXPECT_FALSE(Position < Buffer.length()); - } -}; TEST(Serialization, RequestHeader) { - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TRequestHeaderData value; @@ -99,9 +24,9 @@ TEST(Serialization, RequestHeader) { TKafkaWritable writable(sb); value.Write(writable, 1); - TReadProcessor processor(sb); TRequestHeaderData result; - processor.Read(&result, 1); + TKafkaReadable readable(sb.GetBuffer()); + result.Read(readable, 1); EXPECT_EQ(result.RequestApiKey, 3); EXPECT_EQ(result.RequestApiVersion, 7); @@ -110,7 +35,7 @@ TEST(Serialization, RequestHeader) { } TEST(Serialization, ResponseHeader) { - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TResponseHeaderData value; @@ -119,15 +44,15 @@ TEST(Serialization, ResponseHeader) { TKafkaWritable writable(sb); value.Write(writable, 0); - TReadProcessor processor(sb); + TKafkaReadable readable(sb.GetBuffer()); TResponseHeaderData result; - processor.Read(&result, 0); + result.Read(readable, 0); EXPECT_EQ(result.CorrelationId, 13); } TEST(Serialization, ApiVersionsRequest) { - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TApiVersionsRequestData value; @@ -137,10 +62,9 @@ TEST(Serialization, ApiVersionsRequest) { TKafkaWritable writable(sb); value.Write(writable, 3); - - TReadProcessor processor(sb); + TKafkaReadable readable(sb.GetBuffer()); TApiVersionsRequestData result; - processor.Read(&result, 3); + result.Read(readable, 3); EXPECT_EQ(*result.ClientSoftwareName, "apache-kafka-java"); EXPECT_EQ(*result.ClientSoftwareVersion, "3.4.0"); @@ -149,7 +73,7 @@ TEST(Serialization, ApiVersionsRequest) { TEST(Serialization, ApiVersionsResponse) { TString longString = "long-string-value-0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TApiVersionsResponseData value; @@ -186,9 +110,9 @@ TEST(Serialization, ApiVersionsResponse) { TKafkaWritable writable(sb); value.Write(writable, 3); - TReadProcessor processor(sb); + TKafkaReadable readable(sb.GetBuffer()); TApiVersionsResponseData result; - processor.Read(&result, 3); + result.Read(readable, 3); EXPECT_EQ(result.ErrorCode, 7); EXPECT_EQ(result.ApiKeys.size(), 2ul); @@ -211,7 +135,7 @@ TEST(Serialization, ProduceRequest) { char data0[] = "it-is produce data message 1"; char data1[] = "it-is produce data other message 2"; - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TProduceRequestData value; @@ -222,21 +146,21 @@ TEST(Serialization, ProduceRequest) { value.TopicData[0].Name = "/it/is/some/topic/name"; value.TopicData[0].PartitionData.resize(2); value.TopicData[0].PartitionData[0].Index = 0; - value.TopicData[0].PartitionData[0].Records = { TBuffer(data0, sizeof(data0)) }; + value.TopicData[0].PartitionData[0].Records = TArrayRef(data0); value.TopicData[0].PartitionData[1].Index = 1; value.TopicData[0].PartitionData[1].Records = {}; value.TopicData[1].Name = "/it/is/other/topic/name"; value.TopicData[1].PartitionData.resize(1); value.TopicData[1].PartitionData[0].Index = 0; - value.TopicData[1].PartitionData[0].Records = { TBuffer(data1, sizeof(data1)) }; + value.TopicData[1].PartitionData[0].Records = TArrayRef(data1); TKafkaWritable writable(sb); value.Write(writable, 3); - TReadProcessor processor(sb); + TKafkaReadable readable(sb.GetBuffer()); TProduceRequestData result; - processor.Read(&result, 3); + result.Read(readable, 3); EXPECT_TRUE(result.TransactionalId); EXPECT_EQ(*result.TransactionalId, "transactional-id-value-123456" ); @@ -248,7 +172,7 @@ TEST(Serialization, ProduceRequest) { EXPECT_EQ(result.TopicData[0].PartitionData.size(), 2ul); EXPECT_EQ(result.TopicData[0].PartitionData[0].Index, 0); EXPECT_TRUE(result.TopicData[0].PartitionData[0].Records); - EXPECT_EQ(*result.TopicData[0].PartitionData[0].Records, TBuffer(data0, sizeof(data0))); + EXPECT_EQ(*result.TopicData[0].PartitionData[0].Records, TArrayRef(data0)); EXPECT_EQ(result.TopicData[0].PartitionData[1].Index, 1); EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, std::nullopt); EXPECT_TRUE(result.TopicData[1].Name); @@ -256,19 +180,19 @@ TEST(Serialization, ProduceRequest) { EXPECT_EQ(result.TopicData[1].PartitionData.size(), 1ul); EXPECT_EQ(result.TopicData[1].PartitionData[0].Index, 0); EXPECT_TRUE(result.TopicData[1].PartitionData[0].Records); - EXPECT_EQ(*result.TopicData[1].PartitionData[0].Records, TBuffer(data1, sizeof(data1))); + EXPECT_EQ(*result.TopicData[1].PartitionData[0].Records, TArrayRef(data1)); } TEST(Serialization, UnsignedVarint) { std::vector<ui32> values = {0, 1, 127, 128, 32191}; for(ui32 v : values) { - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TKafkaWritable writable(sb); - TKafkaReadable redable(sb); + TKafkaReadable readable(sb.GetBuffer()); writable.writeUnsignedVarint(v); - ui32 r = redable.readUnsignedVarint(); + ui32 r = readable.readUnsignedVarint(); EXPECT_EQ(r, v); } } @@ -277,9 +201,11 @@ TEST(Serialization, UnsignedVarint) { Meta_##Type_::Type value = Value; \ Meta_##Type_::Type result; \ \ - std::stringstream sb; \ + TWritableBuf sb(nullptr, BUFFER_SIZE); \ TKafkaWritable writable(sb); \ + TKafkaReadable readable(sb.GetBuffer()); \ \ + Y_UNUSED(readable); \ Y_UNUSED(result); \ \ NKafka::NPrivate::TWriteCollector collector; @@ -311,27 +237,41 @@ TEST(Serialization, TKafkaInt8_NotPresentVersion) { SIMPLE_HEAD(TKafkaInt8, 37); NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 0, value); - sb.get(); - EXPECT_TRUE(sb.eof()); // For version 0 value is not serializable. Stream must be empty + EXPECT_EQ(sb.Size(), (size_t)0); // For version 0 value is not serializable. Stream must be empty EXPECT_EQ(collector.NumTaggedFields, 0u); - TFieldReadProcessor<Meta_TKafkaInt8> processor(sb); - processor.Read(value, 0); - - //EXPECT_EQ(result, Meta_TKafkaInt8::Default); // For version 0 value is not serializable + NKafka::NPrivate::Read<Meta_TKafkaInt8>(readable, 0, result); + EXPECT_EQ(result, Meta_TKafkaInt8::Default); // For version 0 value is not serializable } TEST(Serialization, TKafkaInt8_PresentVersion_NotTaggedVersion) { SIMPLE_HEAD(TKafkaInt8, 37); NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 3, value); - TFieldReadProcessor<Meta_TKafkaInt8> processor(sb); - processor.Read(result, 3); + NKafka::NPrivate::Read<Meta_TKafkaInt8>(readable, 3, result); EXPECT_EQ(collector.NumTaggedFields, 0u); EXPECT_EQ(result, value); // Must read same that write } +TEST(Serialization, TKafkaInt8_PresentVersion_TaggedVersion) { + SIMPLE_HEAD(TKafkaInt8, 37); + + NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 11, value); + EXPECT_EQ(collector.NumTaggedFields, 1u); + + NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value); + + i32 tag = readable.readUnsignedVarint(); + EXPECT_EQ(tag, Meta_TKafkaInt8::Tag); + + ui32 size = readable.readUnsignedVarint(); + EXPECT_EQ(size, sizeof(TKafkaInt8)); + + NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result); + EXPECT_EQ(result, value); // Must read same that write +} + TEST(Serialization, TKafkaInt8_PresentVersion_TaggedVersion_Default) { SIMPLE_HEAD(TKafkaInt8, Meta_TKafkaInt8::Default); @@ -402,13 +342,30 @@ TEST(Serialization, TKafkaString_PresentVersion_NotTaggedVersion) { SIMPLE_HEAD(TKafkaString, { "some value" }); NKafka::NPrivate::Write<Meta_TKafkaString>(collector, writable, 3, value); - TFieldReadProcessor<Meta_TKafkaString> processor(sb); - processor.Read(result, 3); + NKafka::NPrivate::Read<Meta_TKafkaString>(readable, 3, result); EXPECT_EQ(collector.NumTaggedFields, 0u); EXPECT_EQ(result, value); // Must read same that write } +TEST(Serialization, TKafkaString_PresentVersion_TaggedVersion) { + SIMPLE_HEAD(TKafkaString, { "some value" }); + + NKafka::NPrivate::Write<Meta_TKafkaString>(collector, writable, 11, value); + EXPECT_EQ(collector.NumTaggedFields, 1u); + + NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value); + + i32 tag = readable.readUnsignedVarint(); + EXPECT_EQ(tag, Meta_TKafkaString::Tag); + + ui32 size = readable.readUnsignedVarint(); + EXPECT_EQ(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0 + + NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result); + EXPECT_EQ(result, value); // Must read same that write +} + TEST(Serialization, TKafkaString_PresentVersion_TaggedVersion_Default) { SIMPLE_HEAD(TKafkaInt8, Meta_TKafkaInt8::Default); @@ -449,13 +406,34 @@ TEST(Serialization, TKafkaArray_PresentVersion_NotTaggedVersion) { SIMPLE_HEAD(TKafkaArray, { "some value" }); NKafka::NPrivate::Write<Meta_TKafkaArray>(collector, writable, 3, value); - TFieldReadProcessor<Meta_TKafkaArray> processor(sb); - processor.Read(result, 3); + NKafka::NPrivate::Read<Meta_TKafkaArray>(readable, 3, result); EXPECT_EQ(collector.NumTaggedFields, 0u); EXPECT_EQ(result, value); // Must read same that write } +TEST(Serialization, TKafkaArray_PresentVersion_TaggedVersion) { + TString v = "some value"; + SIMPLE_HEAD(TKafkaArray, { v }); + + NKafka::NPrivate::Write<Meta_TKafkaArray>(collector, writable, 11, value); + EXPECT_EQ(collector.NumTaggedFields, 1u); + + NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value); + + i32 tag = readable.readUnsignedVarint(); + EXPECT_EQ(tag, Meta_TKafkaArray::Tag); + + ui32 size = readable.readUnsignedVarint(); + EXPECT_EQ(size, v.length() // array element data + + NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size + + NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length + ); + + NKafka::NPrivate::ReadTag<Meta_TKafkaArray>(readable, 11, result); + EXPECT_EQ(result, value); // Must read same that write +} + TEST(Serialization, TKafkaArray_PresentVersion_TaggedVersion_Default) { SIMPLE_HEAD(TKafkaArray, {}); @@ -487,20 +465,43 @@ TEST(Serialization, TKafkaBytes_IsDefault) { Meta_TKafkaBytes::Type value; EXPECT_TRUE(NKafka::NPrivate::IsDefaultValue<Meta_TKafkaBytes>(value)); // value is std::nullopt - value = TBuffer(); - value->Resize(10); + char v[] = "value"; + value = TArrayRef<char>(v); EXPECT_FALSE(NKafka::NPrivate::IsDefaultValue<Meta_TKafkaBytes>(value)); // value is not null } TEST(Serialization, TKafkaBytes_PresentVersion_NotTaggedVersion) { - SIMPLE_HEAD(TKafkaBytes, TBuffer("0123456789", 10)); + char v[] = "0123456789"; + SIMPLE_HEAD(TKafkaBytes, TArrayRef(v)); NKafka::NPrivate::Write<Meta_TKafkaBytes>(collector, writable, 3, value); - TFieldReadProcessor<Meta_TKafkaBytes> processor(sb); - processor.Read(result, 3); + NKafka::NPrivate::Read<Meta_TKafkaBytes>(readable, 3, result); EXPECT_EQ(collector.NumTaggedFields, 0u); - EXPECT_EQ(result, value); // Must read same that write + EXPECT_EQ(result->size(), value->size()); + EXPECT_STREQ(result->begin(), value->begin()); // Must read same that write +} + +TEST(Serialization, TKafkaBytes_PresentVersion_TaggedVersion) { + char v[] = "0123456789"; + SIMPLE_HEAD(TKafkaBytes, TArrayRef(v)); + + NKafka::NPrivate::Write<Meta_TKafkaBytes>(collector, writable, 11, value); + EXPECT_EQ(collector.NumTaggedFields, 1u); + + NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value); + + i32 tag = readable.readUnsignedVarint(); + EXPECT_EQ(tag, Meta_TKafkaArray::Tag); + + ui32 size = readable.readUnsignedVarint(); + EXPECT_EQ(size, value->size() // byffer data + + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0 + ); + + NKafka::NPrivate::ReadTag<Meta_TKafkaBytes>(readable, 11, result); + EXPECT_EQ(result->size(), value->size()); + EXPECT_STREQ(result->begin(), value->begin()); // Must read same that write } TEST(Serialization, TKafkaBytes_PresentVersion_TaggedVersion_Default) { @@ -516,8 +517,9 @@ TEST(Serialization, TRequestHeaderData_reference) { ui8 reference[] = {0x00, 0x03, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0D, 0x00, 0x10, 0x63, 0x6C, 0x69, 0x65, 0x6E, 0x74, 0x2D, 0x69, 0x64, 0x2D, 0x73, 0x74, 0x72, 0x69, 0x6E, 0x67, 0x00}; - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TKafkaWritable writable(sb); + TKafkaReadable readable(sb.GetBuffer()); TRequestHeaderData value; value.RequestApiKey = 3; @@ -527,16 +529,14 @@ TEST(Serialization, TRequestHeaderData_reference) { value.Write(writable, 2); - for(ui8 r : reference) { - ui8 v = sb.get(); - EXPECT_EQ(v, r); + EXPECT_EQ(sb.Size(), sizeof(reference)); + for(size_t i = 0; i < sizeof(reference); ++i) { + EXPECT_EQ(*(sb.Data() + i), reference[i]); } - sb.write((char*)reference, sizeof(reference)); - TReadProcessor processor(sb); TRequestHeaderData result; - processor.Read(&result, 2); + result.Read(readable, 2); EXPECT_EQ(result.RequestApiKey, 3); EXPECT_EQ(result.RequestApiVersion, 7); @@ -572,16 +572,14 @@ TEST(Serialization, TKafkaFloat64_PresentVersion_NotTaggedVersion) { SIMPLE_HEAD(TKafkaFloat64, 3.1415); NKafka::NPrivate::Write<Meta_TKafkaFloat64>(collector, writable, 3, value); - TFieldReadProcessor<Meta_TKafkaFloat64> processor(sb); - processor.Read(result, 3); + NKafka::NPrivate::Read<Meta_TKafkaFloat64>(readable, 3, result); EXPECT_EQ(collector.NumTaggedFields, 0u); EXPECT_EQ(result, value); // Must read same that write - NKafka::NPrivate::Write<Meta_TKafkaFloat64>(collector, writable, 3, value); - for(ui8 r : reference) { - ui8 v = sb.get(); - EXPECT_EQ(v, r); + EXPECT_EQ(sb.Size(), sizeof(reference)); + for(size_t i = 0; i < sizeof(reference); ++i) { + EXPECT_EQ(*(sb.Data() + i), (char)reference[i]); } } @@ -589,12 +587,12 @@ TEST(Serialization, RequestHeader_reference) { ui8 reference[] = {0x00, 0x12, 0x00, 0x00, 0x7F, 0x6F, 0x6F, 0x68, 0x00, 0x0A, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, 0x65, 0x72, 0x2D, 0x31}; - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); sb.write((char*)reference, sizeof(reference)); - TReadProcessor processor(sb); + TKafkaReadable readable(sb.GetBuffer()); TRequestHeaderData result; - processor.Read(&result, 1); + result.Read(readable, 1); EXPECT_EQ(result.RequestApiKey, 0x12); EXPECT_EQ(result.RequestApiVersion, 0x00); @@ -611,22 +609,25 @@ TEST(Serialization, ProduceRequestData_reference) { 0x6B, 0x61, 0x2D, 0x62, 0x79, 0x74, 0x65, 0x73, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0D, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6F, 0x6E, 0x2D, 0x32, 0x33, 0x01, 0x00, 0x00}; - std::stringstream sb; + TWritableBuf sb(nullptr, BUFFER_SIZE); TKafkaWritable writable(sb); - TKafkaReadable readable(sb); + TKafkaReadable readable(sb.GetBuffer()); TProduceRequestData value; value.Acks = 3; value.TimeoutMs = 5; value.TransactionalId = "7"; + char record0[] = "record-13-it-is-kafka-bytes"; + char record1[] = "record-17-it-is-kafka-bytes"; + value.TopicData.resize(2); value.TopicData[0].Name = "partition-11"; value.TopicData[0].PartitionData.resize(3); value.TopicData[0].PartitionData[0].Index = 13; - value.TopicData[0].PartitionData[0].Records = TKafkaRawBytes("record-13-it-is-kafka-bytes", 27); + value.TopicData[0].PartitionData[0].Records = TKafkaRawBytes(record0, 27); value.TopicData[0].PartitionData[1].Index = 17; - value.TopicData[0].PartitionData[1].Records = TKafkaRawBytes("record-17-it-is-kafka-bytes", 27); + value.TopicData[0].PartitionData[1].Records = TKafkaRawBytes(record1, 27); value.TopicData[1].Name = "partition-23"; @@ -634,16 +635,8 @@ TEST(Serialization, ProduceRequestData_reference) { // Print(sb); - for(ui8 r : reference) { - ui8 v = sb.get(); - EXPECT_EQ(v, r); - } - - sb.write((char*)reference, sizeof(reference)); - - TReadProcessor processor(sb); TProduceRequestData result; - processor.Read(&result, 9); + result.Read(readable, 9); EXPECT_EQ(result.Acks, 3); EXPECT_EQ(result.TimeoutMs, 5); @@ -653,14 +646,19 @@ TEST(Serialization, ProduceRequestData_reference) { EXPECT_EQ(result.TopicData[0].Name, "partition-11"); EXPECT_EQ(result.TopicData[0].PartitionData.size(), 3ul); EXPECT_EQ(result.TopicData[0].PartitionData[0].Index, 13); - EXPECT_EQ(result.TopicData[0].PartitionData[0].Records, TKafkaRawBytes("record-13-it-is-kafka-bytes", 27)); + EXPECT_EQ(result.TopicData[0].PartitionData[0].Records, TKafkaRawBytes(record0, 27)); EXPECT_EQ(result.TopicData[0].PartitionData[1].Index, 17); - EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, TKafkaRawBytes("record-17-it-is-kafka-bytes", 27)); + EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, TKafkaRawBytes(record1, 27)); EXPECT_EQ(result.TopicData[0].PartitionData[2].Index, 0); EXPECT_EQ(result.TopicData[0].PartitionData[2].Records, std::nullopt); EXPECT_EQ(result.TopicData[1].Name, "partition-23"); EXPECT_EQ(result.TopicData[1].PartitionData.size(), 0ul); + + EXPECT_EQ(sb.Size(), sizeof(reference)); + for(size_t i = 0; i < sizeof(reference); ++i) { + EXPECT_EQ(*(sb.Data() + i), (char)reference[i]); + } } char Hex(const unsigned char c) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3c3fa68ae5..cbe8673356 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1795,6 +1795,10 @@ message TKafkaProxyConfig { optional int32 ListeningPort = 2 [default = 9092]; optional string SslCertificate = 3; + + optional uint64 MaxMessageSize = 4 [default = 268435456]; + optional uint64 MaxInflightSize = 5 [default = 268435456]; + optional uint64 PacketSize = 6 [default = 1500]; } message TAwsCompatibilityConfig { diff --git a/ydb/core/raw_socket/sock_impl.h b/ydb/core/raw_socket/sock_impl.h index 632acaa1cf..5389f0575b 100644 --- a/ydb/core/raw_socket/sock_impl.h +++ b/ydb/core/raw_socket/sock_impl.h @@ -98,4 +98,51 @@ public: } }; +class TBufferedWriter { +public: + TBufferedWriter(TSocketDescriptor* socket, size_t size) + : Socket(socket) + , Buffer(size) { + } + + void write(const char* src, size_t length) { + size_t possible = std::min(length, Buffer.Avail()); + if (possible > 0) { + Buffer.Append(src, possible); + } + if (0 == Buffer.Avail()) { + flush(); + } + size_t left = length - possible; + if (left > Buffer.Size()) { + Socket->Send(src + possible, left); + } else if (left > 0) { + Buffer.Append(src + possible, left); + } + } + + void flush() { + if (Buffer.Size() > 0) { + Socket->Send(Buffer.Data(), Buffer.Size()); + Buffer.Clear(); + } + } + + const char* Data() { + return Buffer.Data(); + } + + const TBuffer& GetBuffer() { + return Buffer; + } + + size_t Size() { + return Buffer.Size(); + } + +private: + TSocketDescriptor* Socket; + TBuffer Buffer; +}; + } // namespace NKikimr::NRawSocket |