aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-06-23 15:38:51 +0300
committertesseract <tesseract@yandex-team.com>2023-06-23 15:38:51 +0300
commit946e3fceb61ca8aea25cc61c39c3d34a60c508fe (patch)
tree3ee102f4f801da842b2bf44d5ab7baabf7d8f136
parentd37449771450cd5e4c8fce14ee366a68da87dcb8 (diff)
downloadydb-946e3fceb61ca8aea25cc61c39c3d34a60c508fe.tar.gz
Kafka protocol - limit message size
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kafka_proxy/kafka.h112
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp234
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.h8
-rw-r--r--ydb/core/kafka_proxy/kafka_listener.h6
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp5255
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h132
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.cpp60
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h335
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp296
-rw-r--r--ydb/core/protos/config.proto4
-rw-r--r--ydb/core/raw_socket/sock_impl.h47
12 files changed, 1003 insertions, 5488 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 91b339aa37..24672d25ae 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2871,7 +2871,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
setup->LocalServices.emplace_back(
TActorId(),
- TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings),
+ TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),
TMailboxType::HTSwap, appData->UserPoolId)
);
}
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 8e4f5651b2..24429079cc 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -4,13 +4,13 @@
#include <optional>
#include <ostream>
+#include <ydb/core/raw_socket/sock_impl.h>
#include <ydb/library/yql/public/decimal/yql_wide_int.h>
#include <util/generic/buffer.h>
#include <util/generic/strbuf.h>
#include <util/system/types.h>
-
namespace NKafka {
/*
@@ -48,30 +48,36 @@ using TKafkaUuid = NYql::TWide<ui64>;
using TKafkaFloat64 = double;
using TKafkaRawString = TString;
using TKafkaString = std::optional<TKafkaRawString>;
-using TKafkaRawBytes = TBuffer;
+using TKafkaRawBytes = TArrayRef<const char>;
using TKafkaBytes = std::optional<TKafkaRawBytes>;
using TKafkaRecords = std::optional<TKafkaRawBytes>;
-
using TKafkaVersion = i16;
-
-void ErrorOnUnexpectedEnd(std::istream& is);
+using TWritableBuf = NKikimr::NRawSocket::TBufferedWriter;
+
+template <typename T>
+void NormalizeNumber(T& value) {
+#ifndef WORDS_BIGENDIAN
+ char* b = (char*)&value;
+ char* e = b + sizeof(T) - 1;
+ while (b < e) {
+ std::swap(*b, *e);
+ ++b;
+ --e;
+ }
+#endif
+}
class TKafkaWritable {
public:
- TKafkaWritable(std::ostream& os) : Os(os) {};
+ TKafkaWritable(TWritableBuf& buffer)
+ : Buffer(buffer){};
- template<typename T>
+ template <typename T>
TKafkaWritable& operator<<(const T val) {
- char* v = (char*)&val;
-#ifdef WORDS_BIGENDIAN
- Os.write(v, sizeof(T));
-#else
- for(i8 i = sizeof(T) - 1; 0 <= i; --i) {
- Os.write(v + i, sizeof(char));
- }
-#endif
+ NormalizeNumber(val);
+ write((const char*)&val, sizeof(T));
return *this;
};
@@ -82,62 +88,71 @@ public:
void writeUnsignedVarint(TKafkaUint32 val);
void writeVarint(TKafkaInt32 val);
void writeVarint(TKafkaInt64 val);
+ void write(const char* val, size_t length);
private:
- std::ostream& Os;
+ TWritableBuf& Buffer;
};
class TKafkaReadable {
public:
- TKafkaReadable(std::istream& is): Is(is) {};
+ TKafkaReadable(const TBuffer& is)
+ : Is(is)
+ , Position(0) {
+ }
- template<typename T>
+ template <typename T>
TKafkaReadable& operator>>(T& val) {
char* v = (char*)&val;
-#ifdef WORDS_BIGENDIAN
- Is.read(v, sizeof(T));
-#else
- for(i8 i = sizeof(T) - 1; 0 <= i; --i) {
- Is.read(v + i, sizeof(char));
- }
-#endif
- ErrorOnUnexpectedEnd(Is);
+ read(v, sizeof(T));
+ NormalizeNumber(val);
return *this;
};
TKafkaReadable& operator>>(TKafkaUuid& val);
- void read(char* val, int length);
+ void read(char* val, size_t length);
+ char get();
ui32 readUnsignedVarint();
+ TArrayRef<const char> Bytes(size_t length);
- void skip(int length);
+ void skip(size_t length);
private:
- std::istream& Is;
-};
+ void checkEof(size_t length);
+ const TBuffer& Is;
+ size_t Position;
+};
struct TReadDemand {
constexpr TReadDemand()
: Buffer(nullptr)
- , Length(0)
- {}
+ , Length(0) {
+ }
constexpr TReadDemand(char* buffer, size_t length)
: Buffer(buffer)
- , Length(length)
- {}
+ , Length(length) {
+ }
constexpr TReadDemand(size_t length)
: Buffer(nullptr)
- , Length(length)
- {}
-
- char* GetBuffer() const { return Buffer; }
- size_t GetLength() const { return Length; }
- explicit operator bool() const { return 0 < Length; }
- bool Skip() const { return nullptr == Buffer; }
-
+ , Length(length) {
+ }
+
+ char* GetBuffer() const {
+ return Buffer;
+ }
+ size_t GetLength() const {
+ return Length;
+ }
+ explicit operator bool() const {
+ return 0 < Length;
+ }
+ bool Skip() const {
+ return nullptr == Buffer;
+ }
char* Buffer;
size_t Length;
@@ -145,33 +160,24 @@ struct TReadDemand {
static constexpr TReadDemand NoDemand;
-class TReadContext {
-public:
- virtual ~TReadContext() = default;
- virtual TReadDemand Next() = 0;
-};
-
-
class TMessage {
public:
virtual ~TMessage() = default;
virtual i32 Size(TKafkaVersion version) const = 0;
- //virtual void Read(TKafkaReadable& readable, TKafkaVersion version) = 0;
+ virtual void Read(TKafkaReadable& readable, TKafkaVersion version) = 0;
virtual void Write(TKafkaWritable& writable, TKafkaVersion version) const = 0;
- virtual std::unique_ptr<TReadContext> CreateReadContext(TKafkaVersion version) = 0;
bool operator==(const TMessage& other) const = default;
};
-class TApiMessage : public TMessage {
+class TApiMessage: public TMessage {
public:
~TApiMessage() = default;
virtual i16 ApiKey() const = 0;
};
-
std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey);
std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey);
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index f9934a1a8d..44c507b88f 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -33,14 +33,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
public:
using TBase = TActorBootstrapped<TKafkaConnection>;
- // TODO check standard ip packet MTU. On my desktop it is 1500 on eth and wlp interfaces. It is 1300 on the tun interface. It
- // is 1500 and 8950 on the dev server interfaces.
- static constexpr size_t BufferSize = 8950;
- static constexpr size_t MinDirectSize = 256;
-
struct Msg {
size_t Size = 0;
TKafkaInt32 ExpectedSize = 0;
+ TBuffer Buffer;
TRequestHeaderData Header;
std::unique_ptr<TMessage> Message;
};
@@ -51,6 +47,8 @@ public:
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
+ const NKikimrConfig::TKafkaProxyConfig& Config;
+
THPTimer InactivityTimer;
bool IsAuthRequired = true;
@@ -59,32 +57,25 @@ public:
bool ConnectionEstablished = false;
bool CloseConnection = false;
- TBuffer Buffer;
- size_t Length;
- size_t Position;
-
Msg Request;
- bool HeaderSizeWasRead;
- bool HeaderWasRead;
- bool MessageSizeWasRead;
- bool MessageWasRead;
- std::unique_ptr<TReadContext> Ctx;
+
+ enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS };
+ EReadSteps Step;
TReadDemand Demand;
- TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)
+ size_t InflightSize;
+
+ TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+ const NKikimrConfig::TKafkaProxyConfig& config)
: Socket(std::move(socket))
, Address(address)
- , Length(0)
- , Position(0)
- , HeaderSizeWasRead(false)
- , HeaderWasRead(false)
- , MessageSizeWasRead(false)
- , MessageWasRead(false)
- , Demand(NoDemand) {
+ , Config(config)
+ , Step(SIZE_READ)
+ , Demand(NoDemand)
+ , InflightSize(0) {
SetNonBlock();
IsSslSupported = IsSslSupported && Socket->IsSslSupported();
- Buffer.Resize(BufferSize);
}
void Bootstrap() {
@@ -155,7 +146,7 @@ protected:
}
}
- void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/) {
+ void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/, size_t messageSize) {
TApiVersionsResponseData response;
response.ApiKeys.resize(4);
@@ -176,9 +167,11 @@ protected:
response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMax;
Reply(header, &response);
+
+ InflightSize -= messageSize;
}
- void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message) {
+ void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message, size_t messageSize) {
TProduceResponseData response;
response.Responses.resize(message->TopicData.size());
int i = 0;
@@ -196,9 +189,11 @@ protected:
}
Reply(header, &response);
+
+ InflightSize -= messageSize;
}
- void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/) {
+ void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/, size_t messageSize) {
TInitProducerIdResponseData response;
response.ProducerEpoch = 1;
response.ProducerId = 1;
@@ -206,9 +201,11 @@ protected:
response.ThrottleTimeMs = 0;
Reply(header, &response);
+
+ InflightSize -= messageSize;
}
- void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/) {
+ void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/, size_t messageSize) {
TMetadataResponseData response;
response.ThrottleTimeMs = 0;
response.ClusterId = "cluster-ahjgk";
@@ -230,6 +227,8 @@ protected:
response.Topics[0].Partitions[0].IsrNodes[0] = 1;
Reply(header, &response);
+
+ InflightSize -= messageSize;
}
void ProcessRequest() {
@@ -237,19 +236,19 @@ protected:
<< ", Size=" << Request.Size);
switch (Request.Header.RequestApiKey) {
case PRODUCE:
- HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()));
+ HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()), Request.ExpectedSize);
return;
case API_VERSIONS:
- HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()));
+ HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()), Request.ExpectedSize);
return;
case INIT_PRODUCER_ID:
- HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()));
+ HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()), Request.ExpectedSize);
return;
case METADATA:
- HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()));
+ HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()), Request.ExpectedSize);
return;
default:
@@ -267,115 +266,101 @@ protected:
TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version);
- std::stringstream sb;
- TKafkaWritable writable(sb);
+ TBufferedWriter buffer(Socket.Get(), Config.GetPacketSize());
+ TKafkaWritable writable(buffer);
+
writable << size;
responseHeader.Write(writable, headerVersion);
reply->Write(writable, version);
- TBuffer b;
- b.Reserve(size + sizeof(size));
- sb.read(b.data(), size + sizeof(size));
-
- Print("sent", b, size + sizeof(size));
-
- SocketSend(b.Data(), size + sizeof(size));
+ buffer.flush();
}
void DoRead() {
for (;;) {
while (Demand) {
ssize_t received = 0;
- if (Position < Length) {
- KAFKA_LOG_T("Read from buffer: Position=" << Position << ", Length=" << Length
- << ", Demand=" << Demand.GetLength());
- received = std::min(Demand.Length, Length - Position);
- if (!Demand.Skip()) {
- memcpy(Demand.Buffer, Buffer.Data() + Position, received);
- }
- Position += received;
- } else if (!Demand.Skip() && Demand.Length >= MinDirectSize) {
- ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength());
- if (-res == EAGAIN || -res == EWOULDBLOCK) {
- return;
- } else if (-res == EINTR) {
- continue;
- } else if (!res) {
- KAFKA_LOG_ERROR("connection closed");
- return PassAway();
- } else if (res < 0) {
- KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
- return PassAway();
- }
- received = res;
- if (!received) {
- return;
- }
- } else {
- Position = 0;
- Length = 0;
- ssize_t res = SocketReceive(Buffer.Data(), BufferSize);
- if (-res == EAGAIN || -res == EWOULDBLOCK) {
- return;
- } else if (-res == EINTR) {
- continue;
- } else if (!res) {
- KAFKA_LOG_ERROR("connection closed");
- return PassAway();
- } else if (res < 0) {
- KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
- return PassAway();
- }
- Length = res;
- Print("received", Buffer, Length);
- if (!Length) {
- return;
- }
-
+ ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength());
+ if (-res == EAGAIN || -res == EWOULDBLOCK) {
+ return;
+ } else if (-res == EINTR) {
continue;
+ } else if (!res) {
+ KAFKA_LOG_ERROR("connection closed");
+ return PassAway();
+ } else if (res < 0) {
+ KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
+ return PassAway();
+ }
+ received = res;
+ if (!received) {
+ return;
}
Request.Size += received;
Demand.Buffer += received;
Demand.Length -= received;
}
- if (Ctx) {
- Demand = Ctx->Next();
- }
if (!Demand) {
- if (MessageWasRead) {
- HeaderSizeWasRead = false;
- MessageWasRead = false;
- HeaderWasRead = false;
-
- ProcessRequest();
-
- Request = Msg();
- Ctx = nullptr;
- }
- if (!HeaderSizeWasRead) {
- Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize));
- HeaderSizeWasRead = true;
- Ctx = nullptr;
- } else if (!HeaderWasRead) {
- NPrivate::NormalizeNumber(Request.ExpectedSize);
-
- KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize);
-
- Ctx = Request.Header.CreateReadContext(2);
-
- HeaderWasRead = true;
- } else {
- KAFKA_LOG_T("received header. ApiKey=" << Request.Header.RequestApiKey
- << ", Version=" << Request.Header.RequestApiVersion);
-
- i16 apiKey = Request.Header.RequestApiKey;
- TKafkaVersion version = Request.Header.RequestApiVersion;
-
- Request.Message = CreateRequest(apiKey);
- Ctx = Request.Message->CreateReadContext(version);
-
- MessageWasRead = true;
+ switch (Step) {
+ case SIZE_READ:
+ Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize));
+ Step = SIZE_PREPARE;
+ break;
+
+ case SIZE_PREPARE:
+ NormalizeNumber(Request.ExpectedSize);
+ if ((ui64)Request.ExpectedSize > Config.GetMaxMessageSize()) {
+ KAFKA_LOG_ERROR("message is big. Size: " << Request.ExpectedSize);
+ return PassAway();
+ }
+ Step = INFLIGTH_CHECK;
+
+ case INFLIGTH_CHECK:
+ if (InflightSize + Request.ExpectedSize > Config.GetMaxInflightSize()) {
+ return;
+ }
+ InflightSize += Request.ExpectedSize;
+ Step = MESSAGE_READ;
+
+ case MESSAGE_READ:
+ KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize);
+
+ Request.Buffer.Resize(Request.ExpectedSize);
+ Demand = TReadDemand(Request.Buffer.Data(), Request.ExpectedSize);
+
+ Step = MESSAGE_PROCESS;
+ break;
+
+ case MESSAGE_PROCESS:
+ TKafkaInt16 apiKey = *(TKafkaInt16*)Request.Buffer.Data();
+ TKafkaVersion apiVersion = *(TKafkaVersion*)(Request.Buffer.Data() + sizeof(TKafkaInt16));
+
+ NormalizeNumber(apiKey);
+ NormalizeNumber(apiVersion);
+
+ KAFKA_LOG_D("received message. ApiKey=" << Request.Header.RequestApiKey
+ << ", Version=" << Request.Header.RequestApiVersion);
+
+ // Print("received", Request.Buffer, Request.ExpectedSize);
+
+ TKafkaReadable readable(Request.Buffer);
+
+ Request.Message = CreateRequest(apiKey);
+ try {
+ Request.Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion));
+ Request.Message->Read(readable, apiVersion);
+ } catch(const yexception& e) {
+ KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request.Header.RequestApiKey
+ << ", Version=" << Request.Header.RequestApiVersion
+ << ", Error=" << e.what());
+ return PassAway();
+ }
+
+ ProcessRequest();
+
+ Step = SIZE_READ;
+ break;
}
}
}
@@ -420,8 +405,9 @@ protected:
}
};
-NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
- return new TKafkaConnection(std::move(socket), std::move(address));
+NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+ const NKikimrConfig::TKafkaProxyConfig& config) {
+ return new TKafkaConnection(std::move(socket), std::move(address), config);
}
} // namespace NKafka \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h
index e23cf9995f..acece5b24e 100644
--- a/ydb/core/kafka_proxy/kafka_connection.h
+++ b/ydb/core/kafka_proxy/kafka_connection.h
@@ -1,13 +1,15 @@
#pragma once
#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/protos/config.pb.h>
#include <ydb/core/raw_socket/sock_config.h>
#include <ydb/core/raw_socket/sock_impl.h>
namespace NKafka {
-using namespace NKikimr::NRawSocket;
+using namespace NKikimr::NRawSocket;
-NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address);
+NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+ const NKikimrConfig::TKafkaProxyConfig& config);
-}
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h
index a5a36cfc3c..d5dde3ad41 100644
--- a/ydb/core/kafka_proxy/kafka_listener.h
+++ b/ydb/core/kafka_proxy/kafka_listener.h
@@ -7,11 +7,11 @@ namespace NKafka {
using namespace NKikimr::NRawSocket;
-inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings = {.Port = 9092}) {
+inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) {
return CreateSocketListener(
poller, settings,
- [](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
- return CreateKafkaConnection(socket, address);
+ [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return CreateKafkaConnection(socket, address, config);
},
NKikimrServices::EServiceKikimr::KAFKA_PROXY);
}
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 7c45f6719c..f4df5cef28 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -133,147 +133,25 @@ TRequestHeaderData::TRequestHeaderData()
, ClientId(ClientIdMeta::Default)
{}
-
-class TRequestHeaderData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TRequestHeaderData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TRequestHeaderData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TRequestHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TRequestHeaderData";
+ }
+ NPrivate::Read<RequestApiKeyMeta>(_readable, _version, RequestApiKey);
+ NPrivate::Read<RequestApiVersionMeta>(_readable, _version, RequestApiVersion);
+ NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId);
+ NPrivate::Read<ClientIdMeta>(_readable, _version, ClientId);
- NPrivate::TReadStrategy<RequestApiKeyMeta> RequestApiKey;
- NPrivate::TReadStrategy<RequestApiVersionMeta> RequestApiVersion;
- NPrivate::TReadStrategy<CorrelationIdMeta> CorrelationId;
- NPrivate::TReadStrategy<ClientIdMeta> ClientId;
-};
-
-TRequestHeaderData::TReadContext::TReadContext(TRequestHeaderData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , RequestApiKey()
- , RequestApiVersion()
- , CorrelationId()
- , ClientId()
-{}
-
-
-TReadDemand TRequestHeaderData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- RequestApiKey.Init<NPrivate::ReadFieldRule<RequestApiKeyMeta>>(Value.RequestApiKey, Version);
- }
- case 1: {
- auto demand = RequestApiKey.Next<NPrivate::ReadFieldRule<RequestApiKeyMeta>>(Value.RequestApiKey, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- RequestApiVersion.Init<NPrivate::ReadFieldRule<RequestApiVersionMeta>>(Value.RequestApiVersion, Version);
- }
- case 3: {
- auto demand = RequestApiVersion.Next<NPrivate::ReadFieldRule<RequestApiVersionMeta>>(Value.RequestApiVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- CorrelationId.Init<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version);
- }
- case 5: {
- auto demand = CorrelationId.Next<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- ClientId.Init<NPrivate::ReadFieldRule<ClientIdMeta>>(Value.ClientId, Version);
- }
- case 7: {
- auto demand = ClientId.Next<NPrivate::ReadFieldRule<ClientIdMeta>>(Value.ClientId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TRequestHeaderData::MessageMeta::FlexibleVersionMin, TRequestHeaderData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -306,9 +184,6 @@ i32 TRequestHeaderData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TRequestHeaderData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -320,105 +195,22 @@ TResponseHeaderData::TResponseHeaderData()
: CorrelationId(CorrelationIdMeta::Default)
{}
-
-class TResponseHeaderData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TResponseHeaderData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TResponseHeaderData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TResponseHeaderData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TResponseHeaderData";
+ }
+ NPrivate::Read<CorrelationIdMeta>(_readable, _version, CorrelationId);
- NPrivate::TReadStrategy<CorrelationIdMeta> CorrelationId;
-};
-
-TResponseHeaderData::TReadContext::TReadContext(TResponseHeaderData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , CorrelationId()
-{}
-
-
-TReadDemand TResponseHeaderData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- CorrelationId.Init<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version);
- }
- case 1: {
- auto demand = CorrelationId.Next<NPrivate::ReadFieldRule<CorrelationIdMeta>>(Value.CorrelationId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- if (!NPrivate::VersionCheck<TResponseHeaderData::MessageMeta::FlexibleVersionMin, TResponseHeaderData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 3: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 5: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 7: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 4;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -445,9 +237,6 @@ i32 TResponseHeaderData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TResponseHeaderData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -463,147 +252,25 @@ TProduceRequestData::TProduceRequestData()
, TimeoutMs(TimeoutMsMeta::Default)
{}
-
-class TProduceRequestData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceRequestData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceRequestData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData";
+ }
+ NPrivate::Read<TransactionalIdMeta>(_readable, _version, TransactionalId);
+ NPrivate::Read<AcksMeta>(_readable, _version, Acks);
+ NPrivate::Read<TimeoutMsMeta>(_readable, _version, TimeoutMs);
+ NPrivate::Read<TopicDataMeta>(_readable, _version, TopicData);
- NPrivate::TReadStrategy<TransactionalIdMeta> TransactionalId;
- NPrivate::TReadStrategy<AcksMeta> Acks;
- NPrivate::TReadStrategy<TimeoutMsMeta> TimeoutMs;
- NPrivate::TReadStrategy<TopicDataMeta> TopicData;
-};
-
-TProduceRequestData::TReadContext::TReadContext(TProduceRequestData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , TransactionalId()
- , Acks()
- , TimeoutMs()
- , TopicData()
-{}
-
-
-TReadDemand TProduceRequestData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- TransactionalId.Init<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version);
- }
- case 1: {
- auto demand = TransactionalId.Next<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Acks.Init<NPrivate::ReadFieldRule<AcksMeta>>(Value.Acks, Version);
- }
- case 3: {
- auto demand = Acks.Next<NPrivate::ReadFieldRule<AcksMeta>>(Value.Acks, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- TimeoutMs.Init<NPrivate::ReadFieldRule<TimeoutMsMeta>>(Value.TimeoutMs, Version);
- }
- case 5: {
- auto demand = TimeoutMs.Next<NPrivate::ReadFieldRule<TimeoutMsMeta>>(Value.TimeoutMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- TopicData.Init<NPrivate::ReadFieldRule<TopicDataMeta>>(Value.TopicData, Version);
- }
- case 7: {
- auto demand = TopicData.Next<NPrivate::ReadFieldRule<TopicDataMeta>>(Value.TopicData, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TProduceRequestData::MessageMeta::FlexibleVersionMin, TProduceRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -636,9 +303,6 @@ i32 TProduceRequestData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceRequestData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -650,119 +314,23 @@ TProduceRequestData::TTopicProduceData::TTopicProduceData()
: Name(NameMeta::Default)
{}
-
-class TProduceRequestData::TTopicProduceData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceRequestData::TTopicProduceData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceRequestData::TTopicProduceData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceRequestData::TTopicProduceData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData::TTopicProduceData";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<PartitionDataMeta>(_readable, _version, PartitionData);
- NPrivate::TReadStrategy<NameMeta> Name;
- NPrivate::TReadStrategy<PartitionDataMeta> PartitionData;
-};
-
-TProduceRequestData::TTopicProduceData::TReadContext::TReadContext(TProduceRequestData::TTopicProduceData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Name()
- , PartitionData()
-{}
-
-
-TReadDemand TProduceRequestData::TTopicProduceData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 1: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- PartitionData.Init<NPrivate::ReadFieldRule<PartitionDataMeta>>(Value.PartitionData, Version);
- }
- case 3: {
- auto demand = PartitionData.Next<NPrivate::ReadFieldRule<PartitionDataMeta>>(Value.PartitionData, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TProduceRequestData::TTopicProduceData::MessageMeta::FlexibleVersionMin, TProduceRequestData::TTopicProduceData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -791,9 +359,6 @@ i32 TProduceRequestData::TTopicProduceData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceRequestData::TTopicProduceData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -805,119 +370,23 @@ TProduceRequestData::TTopicProduceData::TPartitionProduceData::TPartitionProduce
: Index(IndexMeta::Default)
{}
-
-class TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceRequestData::TTopicProduceData::TPartitionProduceData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceRequestData::TTopicProduceData::TPartitionProduceData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceRequestData::TTopicProduceData::TPartitionProduceData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceRequestData::TTopicProduceData::TPartitionProduceData";
+ }
+ NPrivate::Read<IndexMeta>(_readable, _version, Index);
+ NPrivate::Read<RecordsMeta>(_readable, _version, Records);
- NPrivate::TReadStrategy<IndexMeta> Index;
- NPrivate::TReadStrategy<RecordsMeta> Records;
-};
-
-TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext::TReadContext(TProduceRequestData::TTopicProduceData::TPartitionProduceData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Index()
- , Records()
-{}
-
-
-TReadDemand TProduceRequestData::TTopicProduceData::TPartitionProduceData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Index.Init<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version);
- }
- case 1: {
- auto demand = Index.Next<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Records.Init<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version);
- }
- case 3: {
- auto demand = Records.Next<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TProduceRequestData::TTopicProduceData::TPartitionProduceData::MessageMeta::FlexibleVersionMin, TProduceRequestData::TTopicProduceData::TPartitionProduceData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -946,9 +415,6 @@ i32 TProduceRequestData::TTopicProduceData::TPartitionProduceData::Size(TKafkaVe
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceRequestData::TTopicProduceData::TPartitionProduceData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -960,119 +426,23 @@ TProduceResponseData::TProduceResponseData()
: ThrottleTimeMs(ThrottleTimeMsMeta::Default)
{}
-
-class TProduceResponseData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceResponseData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceResponseData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData";
+ }
+ NPrivate::Read<ResponsesMeta>(_readable, _version, Responses);
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
- NPrivate::TReadStrategy<ResponsesMeta> Responses;
- NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs;
-};
-
-TProduceResponseData::TReadContext::TReadContext(TProduceResponseData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Responses()
- , ThrottleTimeMs()
-{}
-
-
-TReadDemand TProduceResponseData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Responses.Init<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version);
- }
- case 1: {
- auto demand = Responses.Next<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- }
- case 3: {
- auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TProduceResponseData::MessageMeta::FlexibleVersionMin, TProduceResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -1101,9 +471,6 @@ i32 TProduceResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceResponseData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -1115,119 +482,23 @@ TProduceResponseData::TTopicProduceResponse::TTopicProduceResponse()
: Name(NameMeta::Default)
{}
-
-class TProduceResponseData::TTopicProduceResponse::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceResponseData::TTopicProduceResponse& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceResponseData::TTopicProduceResponse& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceResponseData::TTopicProduceResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<PartitionResponsesMeta>(_readable, _version, PartitionResponses);
- NPrivate::TReadStrategy<NameMeta> Name;
- NPrivate::TReadStrategy<PartitionResponsesMeta> PartitionResponses;
-};
-
-TProduceResponseData::TTopicProduceResponse::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Name()
- , PartitionResponses()
-{}
-
-
-TReadDemand TProduceResponseData::TTopicProduceResponse::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 1: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- PartitionResponses.Init<NPrivate::ReadFieldRule<PartitionResponsesMeta>>(Value.PartitionResponses, Version);
- }
- case 3: {
- auto demand = PartitionResponses.Next<NPrivate::ReadFieldRule<PartitionResponsesMeta>>(Value.PartitionResponses, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -1256,9 +527,6 @@ i32 TProduceResponseData::TTopicProduceResponse::Size(TKafkaVersion _version) co
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -1280,189 +548,28 @@ TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TPartiti
, ErrorMessage(ErrorMessageMeta::Default)
{}
-
-class TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<IndexMeta> Index;
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<BaseOffsetMeta> BaseOffset;
- NPrivate::TReadStrategy<LogAppendTimeMsMeta> LogAppendTimeMs;
- NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset;
- NPrivate::TReadStrategy<RecordErrorsMeta> RecordErrors;
- NPrivate::TReadStrategy<ErrorMessageMeta> ErrorMessage;
-};
-
-TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Index()
- , ErrorCode()
- , BaseOffset()
- , LogAppendTimeMs()
- , LogStartOffset()
- , RecordErrors()
- , ErrorMessage()
-{}
-
-
-TReadDemand TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Index.Init<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version);
- }
- case 1: {
- auto demand = Index.Next<NPrivate::ReadFieldRule<IndexMeta>>(Value.Index, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 3: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- BaseOffset.Init<NPrivate::ReadFieldRule<BaseOffsetMeta>>(Value.BaseOffset, Version);
- }
- case 5: {
- auto demand = BaseOffset.Next<NPrivate::ReadFieldRule<BaseOffsetMeta>>(Value.BaseOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- LogAppendTimeMs.Init<NPrivate::ReadFieldRule<LogAppendTimeMsMeta>>(Value.LogAppendTimeMs, Version);
- }
- case 7: {
- auto demand = LogAppendTimeMs.Next<NPrivate::ReadFieldRule<LogAppendTimeMsMeta>>(Value.LogAppendTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- }
- case 9: {
- auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- RecordErrors.Init<NPrivate::ReadFieldRule<RecordErrorsMeta>>(Value.RecordErrors, Version);
- }
- case 11: {
- auto demand = RecordErrors.Next<NPrivate::ReadFieldRule<RecordErrorsMeta>>(Value.RecordErrors, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- ++Step;
- ErrorMessage.Init<NPrivate::ReadFieldRule<ErrorMessageMeta>>(Value.ErrorMessage, Version);
- }
- case 13: {
- auto demand = ErrorMessage.Next<NPrivate::ReadFieldRule<ErrorMessageMeta>>(Value.ErrorMessage, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 15: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 17: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 18: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 19: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 16;
+void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse";
+ }
+ NPrivate::Read<IndexMeta>(_readable, _version, Index);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<BaseOffsetMeta>(_readable, _version, BaseOffset);
+ NPrivate::Read<LogAppendTimeMsMeta>(_readable, _version, LogAppendTimeMs);
+ NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset);
+ NPrivate::Read<RecordErrorsMeta>(_readable, _version, RecordErrors);
+ NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -1501,9 +608,6 @@ i32 TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::Size
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -1517,119 +621,23 @@ TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIn
, BatchIndexErrorMessage(BatchIndexErrorMessageMeta::Default)
{}
-
-class TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage";
+ }
+ NPrivate::Read<BatchIndexMeta>(_readable, _version, BatchIndex);
+ NPrivate::Read<BatchIndexErrorMessageMeta>(_readable, _version, BatchIndexErrorMessage);
- NPrivate::TReadStrategy<BatchIndexMeta> BatchIndex;
- NPrivate::TReadStrategy<BatchIndexErrorMessageMeta> BatchIndexErrorMessage;
-};
-
-TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext::TReadContext(TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , BatchIndex()
- , BatchIndexErrorMessage()
-{}
-
-
-TReadDemand TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- BatchIndex.Init<NPrivate::ReadFieldRule<BatchIndexMeta>>(Value.BatchIndex, Version);
- }
- case 1: {
- auto demand = BatchIndex.Next<NPrivate::ReadFieldRule<BatchIndexMeta>>(Value.BatchIndex, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- BatchIndexErrorMessage.Init<NPrivate::ReadFieldRule<BatchIndexErrorMessageMeta>>(Value.BatchIndexErrorMessage, Version);
- }
- case 3: {
- auto demand = BatchIndexErrorMessage.Next<NPrivate::ReadFieldRule<BatchIndexErrorMessageMeta>>(Value.BatchIndexErrorMessage, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::MessageMeta::FlexibleVersionMin, TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -1658,9 +666,6 @@ i32 TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBat
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TProduceResponseData::TTopicProduceResponse::TPartitionProduceResponse::TBatchIndexAndErrorMessage::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -1688,253 +693,35 @@ TFetchRequestData::TFetchRequestData()
, RackId(RackIdMeta::Default)
{}
-
-class TFetchRequestData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchRequestData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchRequestData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<ClusterIdMeta> ClusterId;
- NPrivate::TReadStrategy<ReplicaIdMeta> ReplicaId;
- NPrivate::TReadStrategy<MaxWaitMsMeta> MaxWaitMs;
- NPrivate::TReadStrategy<MinBytesMeta> MinBytes;
- NPrivate::TReadStrategy<MaxBytesMeta> MaxBytes;
- NPrivate::TReadStrategy<IsolationLevelMeta> IsolationLevel;
- NPrivate::TReadStrategy<SessionIdMeta> SessionId;
- NPrivate::TReadStrategy<SessionEpochMeta> SessionEpoch;
- NPrivate::TReadStrategy<TopicsMeta> Topics;
- NPrivate::TReadStrategy<ForgottenTopicsDataMeta> ForgottenTopicsData;
- NPrivate::TReadStrategy<RackIdMeta> RackId;
-};
-
-TFetchRequestData::TReadContext::TReadContext(TFetchRequestData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ClusterId()
- , ReplicaId()
- , MaxWaitMs()
- , MinBytes()
- , MaxBytes()
- , IsolationLevel()
- , SessionId()
- , SessionEpoch()
- , Topics()
- , ForgottenTopicsData()
- , RackId()
-{}
-
-
-TReadDemand TFetchRequestData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ClusterId.Init<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- }
- case 1: {
- auto demand = ClusterId.Next<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ReplicaId.Init<NPrivate::ReadFieldRule<ReplicaIdMeta>>(Value.ReplicaId, Version);
- }
- case 3: {
- auto demand = ReplicaId.Next<NPrivate::ReadFieldRule<ReplicaIdMeta>>(Value.ReplicaId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- MaxWaitMs.Init<NPrivate::ReadFieldRule<MaxWaitMsMeta>>(Value.MaxWaitMs, Version);
- }
- case 5: {
- auto demand = MaxWaitMs.Next<NPrivate::ReadFieldRule<MaxWaitMsMeta>>(Value.MaxWaitMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- MinBytes.Init<NPrivate::ReadFieldRule<MinBytesMeta>>(Value.MinBytes, Version);
- }
- case 7: {
- auto demand = MinBytes.Next<NPrivate::ReadFieldRule<MinBytesMeta>>(Value.MinBytes, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- MaxBytes.Init<NPrivate::ReadFieldRule<MaxBytesMeta>>(Value.MaxBytes, Version);
- }
- case 9: {
- auto demand = MaxBytes.Next<NPrivate::ReadFieldRule<MaxBytesMeta>>(Value.MaxBytes, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- IsolationLevel.Init<NPrivate::ReadFieldRule<IsolationLevelMeta>>(Value.IsolationLevel, Version);
- }
- case 11: {
- auto demand = IsolationLevel.Next<NPrivate::ReadFieldRule<IsolationLevelMeta>>(Value.IsolationLevel, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- ++Step;
- SessionId.Init<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version);
- }
- case 13: {
- auto demand = SessionId.Next<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- ++Step;
- SessionEpoch.Init<NPrivate::ReadFieldRule<SessionEpochMeta>>(Value.SessionEpoch, Version);
- }
- case 15: {
- auto demand = SessionEpoch.Next<NPrivate::ReadFieldRule<SessionEpochMeta>>(Value.SessionEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- ++Step;
- Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- }
- case 17: {
- auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 18: {
- ++Step;
- ForgottenTopicsData.Init<NPrivate::ReadFieldRule<ForgottenTopicsDataMeta>>(Value.ForgottenTopicsData, Version);
- }
- case 19: {
- auto demand = ForgottenTopicsData.Next<NPrivate::ReadFieldRule<ForgottenTopicsDataMeta>>(Value.ForgottenTopicsData, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 20: {
- ++Step;
- RackId.Init<NPrivate::ReadFieldRule<RackIdMeta>>(Value.RackId, Version);
- }
- case 21: {
- auto demand = RackId.Next<NPrivate::ReadFieldRule<RackIdMeta>>(Value.RackId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 22: {
- if (!NPrivate::VersionCheck<TFetchRequestData::MessageMeta::FlexibleVersionMin, TFetchRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 23: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 24: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 25: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 26: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 27: {
- TReadDemand demand;
- switch(Tag_.Value) {
- case 0: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- ClusterId.Init<NPrivate::ReadTaggedFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- }
- demand = ClusterId.Next<NPrivate::ReadTaggedFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- break;
- }
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 24;
+void TFetchRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData";
+ }
+ NPrivate::Read<ClusterIdMeta>(_readable, _version, ClusterId);
+ NPrivate::Read<ReplicaIdMeta>(_readable, _version, ReplicaId);
+ NPrivate::Read<MaxWaitMsMeta>(_readable, _version, MaxWaitMs);
+ NPrivate::Read<MinBytesMeta>(_readable, _version, MinBytes);
+ NPrivate::Read<MaxBytesMeta>(_readable, _version, MaxBytes);
+ NPrivate::Read<IsolationLevelMeta>(_readable, _version, IsolationLevel);
+ NPrivate::Read<SessionIdMeta>(_readable, _version, SessionId);
+ NPrivate::Read<SessionEpochMeta>(_readable, _version, SessionEpoch);
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+ NPrivate::Read<ForgottenTopicsDataMeta>(_readable, _version, ForgottenTopicsData);
+ NPrivate::Read<RackIdMeta>(_readable, _version, RackId);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ case ClusterIdMeta::Tag:
+ NPrivate::ReadTag<ClusterIdMeta>(_readable, _version, ClusterId);
+ break;
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -1982,9 +769,6 @@ i32 TFetchRequestData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchRequestData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -1998,133 +782,24 @@ TFetchRequestData::TFetchTopic::TFetchTopic()
, TopicId(TopicIdMeta::Default)
{}
-
-class TFetchRequestData::TFetchTopic::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchRequestData::TFetchTopic& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchRequestData::TFetchTopic& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchRequestData::TFetchTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TFetchTopic";
+ }
+ NPrivate::Read<TopicMeta>(_readable, _version, Topic);
+ NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
- NPrivate::TReadStrategy<TopicMeta> Topic;
- NPrivate::TReadStrategy<TopicIdMeta> TopicId;
- NPrivate::TReadStrategy<PartitionsMeta> Partitions;
-};
-
-TFetchRequestData::TFetchTopic::TReadContext::TReadContext(TFetchRequestData::TFetchTopic& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Topic()
- , TopicId()
- , Partitions()
-{}
-
-
-TReadDemand TFetchRequestData::TFetchTopic::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- }
- case 1: {
- auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- }
- case 3: {
- auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- }
- case 5: {
- auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TFetchRequestData::TFetchTopic::MessageMeta::FlexibleVersionMin, TFetchRequestData::TFetchTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -2155,9 +830,6 @@ i32 TFetchRequestData::TFetchTopic::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TFetchTopic::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -2179,175 +851,27 @@ TFetchRequestData::TFetchTopic::TFetchPartition::TFetchPartition()
, PartitionMaxBytes(PartitionMaxBytesMeta::Default)
{}
-
-class TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchRequestData::TFetchTopic::TFetchPartition& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchRequestData::TFetchTopic::TFetchPartition& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<PartitionMeta> Partition;
- NPrivate::TReadStrategy<CurrentLeaderEpochMeta> CurrentLeaderEpoch;
- NPrivate::TReadStrategy<FetchOffsetMeta> FetchOffset;
- NPrivate::TReadStrategy<LastFetchedEpochMeta> LastFetchedEpoch;
- NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset;
- NPrivate::TReadStrategy<PartitionMaxBytesMeta> PartitionMaxBytes;
-};
-
-TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext::TReadContext(TFetchRequestData::TFetchTopic::TFetchPartition& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Partition()
- , CurrentLeaderEpoch()
- , FetchOffset()
- , LastFetchedEpoch()
- , LogStartOffset()
- , PartitionMaxBytes()
-{}
-
-
-TReadDemand TFetchRequestData::TFetchTopic::TFetchPartition::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Partition.Init<NPrivate::ReadFieldRule<PartitionMeta>>(Value.Partition, Version);
- }
- case 1: {
- auto demand = Partition.Next<NPrivate::ReadFieldRule<PartitionMeta>>(Value.Partition, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- CurrentLeaderEpoch.Init<NPrivate::ReadFieldRule<CurrentLeaderEpochMeta>>(Value.CurrentLeaderEpoch, Version);
- }
- case 3: {
- auto demand = CurrentLeaderEpoch.Next<NPrivate::ReadFieldRule<CurrentLeaderEpochMeta>>(Value.CurrentLeaderEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- FetchOffset.Init<NPrivate::ReadFieldRule<FetchOffsetMeta>>(Value.FetchOffset, Version);
- }
- case 5: {
- auto demand = FetchOffset.Next<NPrivate::ReadFieldRule<FetchOffsetMeta>>(Value.FetchOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- LastFetchedEpoch.Init<NPrivate::ReadFieldRule<LastFetchedEpochMeta>>(Value.LastFetchedEpoch, Version);
- }
- case 7: {
- auto demand = LastFetchedEpoch.Next<NPrivate::ReadFieldRule<LastFetchedEpochMeta>>(Value.LastFetchedEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- }
- case 9: {
- auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- PartitionMaxBytes.Init<NPrivate::ReadFieldRule<PartitionMaxBytesMeta>>(Value.PartitionMaxBytes, Version);
- }
- case 11: {
- auto demand = PartitionMaxBytes.Next<NPrivate::ReadFieldRule<PartitionMaxBytesMeta>>(Value.PartitionMaxBytes, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- if (!NPrivate::VersionCheck<TFetchRequestData::TFetchTopic::TFetchPartition::MessageMeta::FlexibleVersionMin, TFetchRequestData::TFetchTopic::TFetchPartition::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 13: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 15: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 17: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 14;
+void TFetchRequestData::TFetchTopic::TFetchPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TFetchTopic::TFetchPartition";
+ }
+ NPrivate::Read<PartitionMeta>(_readable, _version, Partition);
+ NPrivate::Read<CurrentLeaderEpochMeta>(_readable, _version, CurrentLeaderEpoch);
+ NPrivate::Read<FetchOffsetMeta>(_readable, _version, FetchOffset);
+ NPrivate::Read<LastFetchedEpochMeta>(_readable, _version, LastFetchedEpoch);
+ NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset);
+ NPrivate::Read<PartitionMaxBytesMeta>(_readable, _version, PartitionMaxBytes);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -2384,9 +908,6 @@ i32 TFetchRequestData::TFetchTopic::TFetchPartition::Size(TKafkaVersion _version
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TFetchTopic::TFetchPartition::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -2400,133 +921,24 @@ TFetchRequestData::TForgottenTopic::TForgottenTopic()
, TopicId(TopicIdMeta::Default)
{}
-
-class TFetchRequestData::TForgottenTopic::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchRequestData::TForgottenTopic& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchRequestData::TForgottenTopic& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchRequestData::TForgottenTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchRequestData::TForgottenTopic";
+ }
+ NPrivate::Read<TopicMeta>(_readable, _version, Topic);
+ NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
- NPrivate::TReadStrategy<TopicMeta> Topic;
- NPrivate::TReadStrategy<TopicIdMeta> TopicId;
- NPrivate::TReadStrategy<PartitionsMeta> Partitions;
-};
-
-TFetchRequestData::TForgottenTopic::TReadContext::TReadContext(TFetchRequestData::TForgottenTopic& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Topic()
- , TopicId()
- , Partitions()
-{}
-
-
-TReadDemand TFetchRequestData::TForgottenTopic::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- }
- case 1: {
- auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- }
- case 3: {
- auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- }
- case 5: {
- auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TFetchRequestData::TForgottenTopic::MessageMeta::FlexibleVersionMin, TFetchRequestData::TForgottenTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -2557,9 +969,6 @@ i32 TFetchRequestData::TForgottenTopic::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchRequestData::TForgottenTopic::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -2575,147 +984,25 @@ TFetchResponseData::TFetchResponseData()
, SessionId(SessionIdMeta::Default)
{}
-
-class TFetchResponseData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<SessionIdMeta>(_readable, _version, SessionId);
+ NPrivate::Read<ResponsesMeta>(_readable, _version, Responses);
- NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs;
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<SessionIdMeta> SessionId;
- NPrivate::TReadStrategy<ResponsesMeta> Responses;
-};
-
-TFetchResponseData::TReadContext::TReadContext(TFetchResponseData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ThrottleTimeMs()
- , ErrorCode()
- , SessionId()
- , Responses()
-{}
-
-
-TReadDemand TFetchResponseData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- }
- case 1: {
- auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 3: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- SessionId.Init<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version);
- }
- case 5: {
- auto demand = SessionId.Next<NPrivate::ReadFieldRule<SessionIdMeta>>(Value.SessionId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- Responses.Init<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version);
- }
- case 7: {
- auto demand = Responses.Next<NPrivate::ReadFieldRule<ResponsesMeta>>(Value.Responses, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TFetchResponseData::MessageMeta::FlexibleVersionMin, TFetchResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -2748,9 +1035,6 @@ i32 TFetchResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -2764,133 +1048,24 @@ TFetchResponseData::TFetchableTopicResponse::TFetchableTopicResponse()
, TopicId(TopicIdMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::TFetchableTopicResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse";
+ }
+ NPrivate::Read<TopicMeta>(_readable, _version, Topic);
+ NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
- NPrivate::TReadStrategy<TopicMeta> Topic;
- NPrivate::TReadStrategy<TopicIdMeta> TopicId;
- NPrivate::TReadStrategy<PartitionsMeta> Partitions;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Topic()
- , TopicId()
- , Partitions()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Topic.Init<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- }
- case 1: {
- auto demand = Topic.Next<NPrivate::ReadFieldRule<TopicMeta>>(Value.Topic, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- }
- case 3: {
- auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- }
- case 5: {
- auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -2921,9 +1096,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::Size(TKafkaVersion _version) co
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -2945,269 +1117,41 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TPartitionData()
, PreferredReadReplica(PreferredReadReplicaMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse::TPartitionData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<PartitionIndexMeta> PartitionIndex;
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<HighWatermarkMeta> HighWatermark;
- NPrivate::TReadStrategy<LastStableOffsetMeta> LastStableOffset;
- NPrivate::TReadStrategy<LogStartOffsetMeta> LogStartOffset;
- NPrivate::TReadStrategy<DivergingEpochMeta> DivergingEpoch;
- NPrivate::TReadStrategy<CurrentLeaderMeta> CurrentLeader;
- NPrivate::TReadStrategy<SnapshotIdMeta> SnapshotId;
- NPrivate::TReadStrategy<AbortedTransactionsMeta> AbortedTransactions;
- NPrivate::TReadStrategy<PreferredReadReplicaMeta> PreferredReadReplica;
- NPrivate::TReadStrategy<RecordsMeta> Records;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , PartitionIndex()
- , ErrorCode()
- , HighWatermark()
- , LastStableOffset()
- , LogStartOffset()
- , DivergingEpoch()
- , CurrentLeader()
- , SnapshotId()
- , AbortedTransactions()
- , PreferredReadReplica()
- , Records()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- PartitionIndex.Init<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version);
- }
- case 1: {
- auto demand = PartitionIndex.Next<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 3: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- HighWatermark.Init<NPrivate::ReadFieldRule<HighWatermarkMeta>>(Value.HighWatermark, Version);
- }
- case 5: {
- auto demand = HighWatermark.Next<NPrivate::ReadFieldRule<HighWatermarkMeta>>(Value.HighWatermark, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- LastStableOffset.Init<NPrivate::ReadFieldRule<LastStableOffsetMeta>>(Value.LastStableOffset, Version);
- }
- case 7: {
- auto demand = LastStableOffset.Next<NPrivate::ReadFieldRule<LastStableOffsetMeta>>(Value.LastStableOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- LogStartOffset.Init<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- }
- case 9: {
- auto demand = LogStartOffset.Next<NPrivate::ReadFieldRule<LogStartOffsetMeta>>(Value.LogStartOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- DivergingEpoch.Init<NPrivate::ReadFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version);
- }
- case 11: {
- auto demand = DivergingEpoch.Next<NPrivate::ReadFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- ++Step;
- CurrentLeader.Init<NPrivate::ReadFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version);
- }
- case 13: {
- auto demand = CurrentLeader.Next<NPrivate::ReadFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- ++Step;
- SnapshotId.Init<NPrivate::ReadFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version);
- }
- case 15: {
- auto demand = SnapshotId.Next<NPrivate::ReadFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- ++Step;
- AbortedTransactions.Init<NPrivate::ReadFieldRule<AbortedTransactionsMeta>>(Value.AbortedTransactions, Version);
- }
- case 17: {
- auto demand = AbortedTransactions.Next<NPrivate::ReadFieldRule<AbortedTransactionsMeta>>(Value.AbortedTransactions, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 18: {
- ++Step;
- PreferredReadReplica.Init<NPrivate::ReadFieldRule<PreferredReadReplicaMeta>>(Value.PreferredReadReplica, Version);
- }
- case 19: {
- auto demand = PreferredReadReplica.Next<NPrivate::ReadFieldRule<PreferredReadReplicaMeta>>(Value.PreferredReadReplica, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 20: {
- ++Step;
- Records.Init<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version);
- }
- case 21: {
- auto demand = Records.Next<NPrivate::ReadFieldRule<RecordsMeta>>(Value.Records, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 22: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 23: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 24: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 25: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 26: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 27: {
- TReadDemand demand;
- switch(Tag_.Value) {
- case 0: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- DivergingEpoch.Init<NPrivate::ReadTaggedFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version);
- }
- demand = DivergingEpoch.Next<NPrivate::ReadTaggedFieldRule<DivergingEpochMeta>>(Value.DivergingEpoch, Version);
- break;
- }
- case 1: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- CurrentLeader.Init<NPrivate::ReadTaggedFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version);
- }
- demand = CurrentLeader.Next<NPrivate::ReadTaggedFieldRule<CurrentLeaderMeta>>(Value.CurrentLeader, Version);
- break;
- }
- case 2: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- SnapshotId.Init<NPrivate::ReadTaggedFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version);
- }
- demand = SnapshotId.Next<NPrivate::ReadTaggedFieldRule<SnapshotIdMeta>>(Value.SnapshotId, Version);
- break;
- }
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 24;
+void TFetchResponseData::TFetchableTopicResponse::TPartitionData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData";
+ }
+ NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<HighWatermarkMeta>(_readable, _version, HighWatermark);
+ NPrivate::Read<LastStableOffsetMeta>(_readable, _version, LastStableOffset);
+ NPrivate::Read<LogStartOffsetMeta>(_readable, _version, LogStartOffset);
+ NPrivate::Read<DivergingEpochMeta>(_readable, _version, DivergingEpoch);
+ NPrivate::Read<CurrentLeaderMeta>(_readable, _version, CurrentLeader);
+ NPrivate::Read<SnapshotIdMeta>(_readable, _version, SnapshotId);
+ NPrivate::Read<AbortedTransactionsMeta>(_readable, _version, AbortedTransactions);
+ NPrivate::Read<PreferredReadReplicaMeta>(_readable, _version, PreferredReadReplica);
+ NPrivate::Read<RecordsMeta>(_readable, _version, Records);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ case DivergingEpochMeta::Tag:
+ NPrivate::ReadTag<DivergingEpochMeta>(_readable, _version, DivergingEpoch);
+ break;
+ case CurrentLeaderMeta::Tag:
+ NPrivate::ReadTag<CurrentLeaderMeta>(_readable, _version, CurrentLeader);
+ break;
+ case SnapshotIdMeta::Tag:
+ NPrivate::ReadTag<SnapshotIdMeta>(_readable, _version, SnapshotId);
+ break;
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -3257,9 +1201,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::Size(TKafkaVers
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -3273,119 +1214,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TE
, EndOffset(EndOffsetMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset";
+ }
+ NPrivate::Read<EpochMeta>(_readable, _version, Epoch);
+ NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset);
- NPrivate::TReadStrategy<EpochMeta> Epoch;
- NPrivate::TReadStrategy<EndOffsetMeta> EndOffset;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Epoch()
- , EndOffset()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Epoch.Init<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version);
- }
- case 1: {
- auto demand = Epoch.Next<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- EndOffset.Init<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version);
- }
- case 3: {
- auto demand = EndOffset.Next<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -3414,9 +1259,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TEpochEndOffset::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -3430,119 +1272,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::
, LeaderEpoch(LeaderEpochMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch";
+ }
+ NPrivate::Read<LeaderIdMeta>(_readable, _version, LeaderId);
+ NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch);
- NPrivate::TReadStrategy<LeaderIdMeta> LeaderId;
- NPrivate::TReadStrategy<LeaderEpochMeta> LeaderEpoch;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , LeaderId()
- , LeaderEpoch()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- LeaderId.Init<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version);
- }
- case 1: {
- auto demand = LeaderId.Next<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- LeaderEpoch.Init<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version);
- }
- case 3: {
- auto demand = LeaderEpoch.Next<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -3571,9 +1317,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpo
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TLeaderIdAndEpoch::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -3587,119 +1330,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TSnaps
, Epoch(EpochMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId";
+ }
+ NPrivate::Read<EndOffsetMeta>(_readable, _version, EndOffset);
+ NPrivate::Read<EpochMeta>(_readable, _version, Epoch);
- NPrivate::TReadStrategy<EndOffsetMeta> EndOffset;
- NPrivate::TReadStrategy<EpochMeta> Epoch;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , EndOffset()
- , Epoch()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- EndOffset.Init<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version);
- }
- case 1: {
- auto demand = EndOffset.Next<NPrivate::ReadFieldRule<EndOffsetMeta>>(Value.EndOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Epoch.Init<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version);
- }
- case 3: {
- auto demand = Epoch.Next<NPrivate::ReadFieldRule<EpochMeta>>(Value.Epoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -3728,9 +1375,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::Si
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TSnapshotId::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -3744,119 +1388,23 @@ TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction
, FirstOffset(FirstOffsetMeta::Default)
{}
-
-class TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction";
+ }
+ NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId);
+ NPrivate::Read<FirstOffsetMeta>(_readable, _version, FirstOffset);
- NPrivate::TReadStrategy<ProducerIdMeta> ProducerId;
- NPrivate::TReadStrategy<FirstOffsetMeta> FirstOffset;
-};
-
-TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext::TReadContext(TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ProducerId()
- , FirstOffset()
-{}
-
-
-TReadDemand TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- }
- case 1: {
- auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- FirstOffset.Init<NPrivate::ReadFieldRule<FirstOffsetMeta>>(Value.FirstOffset, Version);
- }
- case 3: {
- auto demand = FirstOffset.Next<NPrivate::ReadFieldRule<FirstOffsetMeta>>(Value.FirstOffset, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::MessageMeta::FlexibleVersionMin, TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -3885,9 +1433,6 @@ i32 TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransac
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TFetchResponseData::TFetchableTopicResponse::TPartitionData::TAbortedTransaction::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -3903,147 +1448,25 @@ TMetadataRequestData::TMetadataRequestData()
, IncludeTopicAuthorizedOperations(IncludeTopicAuthorizedOperationsMeta::Default)
{}
-
-class TMetadataRequestData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataRequestData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataRequestData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TMetadataRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataRequestData";
+ }
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+ NPrivate::Read<AllowAutoTopicCreationMeta>(_readable, _version, AllowAutoTopicCreation);
+ NPrivate::Read<IncludeClusterAuthorizedOperationsMeta>(_readable, _version, IncludeClusterAuthorizedOperations);
+ NPrivate::Read<IncludeTopicAuthorizedOperationsMeta>(_readable, _version, IncludeTopicAuthorizedOperations);
- NPrivate::TReadStrategy<TopicsMeta> Topics;
- NPrivate::TReadStrategy<AllowAutoTopicCreationMeta> AllowAutoTopicCreation;
- NPrivate::TReadStrategy<IncludeClusterAuthorizedOperationsMeta> IncludeClusterAuthorizedOperations;
- NPrivate::TReadStrategy<IncludeTopicAuthorizedOperationsMeta> IncludeTopicAuthorizedOperations;
-};
-
-TMetadataRequestData::TReadContext::TReadContext(TMetadataRequestData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Topics()
- , AllowAutoTopicCreation()
- , IncludeClusterAuthorizedOperations()
- , IncludeTopicAuthorizedOperations()
-{}
-
-
-TReadDemand TMetadataRequestData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- }
- case 1: {
- auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- AllowAutoTopicCreation.Init<NPrivate::ReadFieldRule<AllowAutoTopicCreationMeta>>(Value.AllowAutoTopicCreation, Version);
- }
- case 3: {
- auto demand = AllowAutoTopicCreation.Next<NPrivate::ReadFieldRule<AllowAutoTopicCreationMeta>>(Value.AllowAutoTopicCreation, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- IncludeClusterAuthorizedOperations.Init<NPrivate::ReadFieldRule<IncludeClusterAuthorizedOperationsMeta>>(Value.IncludeClusterAuthorizedOperations, Version);
- }
- case 5: {
- auto demand = IncludeClusterAuthorizedOperations.Next<NPrivate::ReadFieldRule<IncludeClusterAuthorizedOperationsMeta>>(Value.IncludeClusterAuthorizedOperations, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- IncludeTopicAuthorizedOperations.Init<NPrivate::ReadFieldRule<IncludeTopicAuthorizedOperationsMeta>>(Value.IncludeTopicAuthorizedOperations, Version);
- }
- case 7: {
- auto demand = IncludeTopicAuthorizedOperations.Next<NPrivate::ReadFieldRule<IncludeTopicAuthorizedOperationsMeta>>(Value.IncludeTopicAuthorizedOperations, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TMetadataRequestData::MessageMeta::FlexibleVersionMin, TMetadataRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -4076,9 +1499,6 @@ i32 TMetadataRequestData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataRequestData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -4092,119 +1512,23 @@ TMetadataRequestData::TMetadataRequestTopic::TMetadataRequestTopic()
, Name(NameMeta::Default)
{}
-
-class TMetadataRequestData::TMetadataRequestTopic::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataRequestData::TMetadataRequestTopic& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataRequestData::TMetadataRequestTopic& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TMetadataRequestData::TMetadataRequestTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataRequestData::TMetadataRequestTopic";
+ }
+ NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId);
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
- NPrivate::TReadStrategy<TopicIdMeta> TopicId;
- NPrivate::TReadStrategy<NameMeta> Name;
-};
-
-TMetadataRequestData::TMetadataRequestTopic::TReadContext::TReadContext(TMetadataRequestData::TMetadataRequestTopic& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , TopicId()
- , Name()
-{}
-
-
-TReadDemand TMetadataRequestData::TMetadataRequestTopic::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- }
- case 1: {
- auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 3: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TMetadataRequestData::TMetadataRequestTopic::MessageMeta::FlexibleVersionMin, TMetadataRequestData::TMetadataRequestTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -4233,9 +1557,6 @@ i32 TMetadataRequestData::TMetadataRequestTopic::Size(TKafkaVersion _version) co
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataRequestData::TMetadataRequestTopic::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -4253,175 +1574,27 @@ TMetadataResponseData::TMetadataResponseData()
, ClusterAuthorizedOperations(ClusterAuthorizedOperationsMeta::Default)
{}
-
-class TMetadataResponseData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataResponseData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataResponseData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs;
- NPrivate::TReadStrategy<BrokersMeta> Brokers;
- NPrivate::TReadStrategy<ClusterIdMeta> ClusterId;
- NPrivate::TReadStrategy<ControllerIdMeta> ControllerId;
- NPrivate::TReadStrategy<TopicsMeta> Topics;
- NPrivate::TReadStrategy<ClusterAuthorizedOperationsMeta> ClusterAuthorizedOperations;
-};
-
-TMetadataResponseData::TReadContext::TReadContext(TMetadataResponseData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ThrottleTimeMs()
- , Brokers()
- , ClusterId()
- , ControllerId()
- , Topics()
- , ClusterAuthorizedOperations()
-{}
-
-
-TReadDemand TMetadataResponseData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- }
- case 1: {
- auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Brokers.Init<NPrivate::ReadFieldRule<BrokersMeta>>(Value.Brokers, Version);
- }
- case 3: {
- auto demand = Brokers.Next<NPrivate::ReadFieldRule<BrokersMeta>>(Value.Brokers, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- ClusterId.Init<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- }
- case 5: {
- auto demand = ClusterId.Next<NPrivate::ReadFieldRule<ClusterIdMeta>>(Value.ClusterId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- ControllerId.Init<NPrivate::ReadFieldRule<ControllerIdMeta>>(Value.ControllerId, Version);
- }
- case 7: {
- auto demand = ControllerId.Next<NPrivate::ReadFieldRule<ControllerIdMeta>>(Value.ControllerId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- Topics.Init<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- }
- case 9: {
- auto demand = Topics.Next<NPrivate::ReadFieldRule<TopicsMeta>>(Value.Topics, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- ClusterAuthorizedOperations.Init<NPrivate::ReadFieldRule<ClusterAuthorizedOperationsMeta>>(Value.ClusterAuthorizedOperations, Version);
- }
- case 11: {
- auto demand = ClusterAuthorizedOperations.Next<NPrivate::ReadFieldRule<ClusterAuthorizedOperationsMeta>>(Value.ClusterAuthorizedOperations, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- if (!NPrivate::VersionCheck<TMetadataResponseData::MessageMeta::FlexibleVersionMin, TMetadataResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 13: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 15: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 17: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 14;
+void TMetadataResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<BrokersMeta>(_readable, _version, Brokers);
+ NPrivate::Read<ClusterIdMeta>(_readable, _version, ClusterId);
+ NPrivate::Read<ControllerIdMeta>(_readable, _version, ControllerId);
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+ NPrivate::Read<ClusterAuthorizedOperationsMeta>(_readable, _version, ClusterAuthorizedOperations);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -4458,9 +1631,6 @@ i32 TMetadataResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -4478,147 +1648,25 @@ TMetadataResponseData::TMetadataResponseBroker::TMetadataResponseBroker()
, Rack(RackMeta::Default)
{}
-
-class TMetadataResponseData::TMetadataResponseBroker::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataResponseData::TMetadataResponseBroker& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataResponseData::TMetadataResponseBroker& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TMetadataResponseData::TMetadataResponseBroker::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseBroker";
+ }
+ NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId);
+ NPrivate::Read<HostMeta>(_readable, _version, Host);
+ NPrivate::Read<PortMeta>(_readable, _version, Port);
+ NPrivate::Read<RackMeta>(_readable, _version, Rack);
- NPrivate::TReadStrategy<NodeIdMeta> NodeId;
- NPrivate::TReadStrategy<HostMeta> Host;
- NPrivate::TReadStrategy<PortMeta> Port;
- NPrivate::TReadStrategy<RackMeta> Rack;
-};
-
-TMetadataResponseData::TMetadataResponseBroker::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseBroker& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , NodeId()
- , Host()
- , Port()
- , Rack()
-{}
-
-
-TReadDemand TMetadataResponseData::TMetadataResponseBroker::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- NodeId.Init<NPrivate::ReadFieldRule<NodeIdMeta>>(Value.NodeId, Version);
- }
- case 1: {
- auto demand = NodeId.Next<NPrivate::ReadFieldRule<NodeIdMeta>>(Value.NodeId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Host.Init<NPrivate::ReadFieldRule<HostMeta>>(Value.Host, Version);
- }
- case 3: {
- auto demand = Host.Next<NPrivate::ReadFieldRule<HostMeta>>(Value.Host, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- Port.Init<NPrivate::ReadFieldRule<PortMeta>>(Value.Port, Version);
- }
- case 5: {
- auto demand = Port.Next<NPrivate::ReadFieldRule<PortMeta>>(Value.Port, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- Rack.Init<NPrivate::ReadFieldRule<RackMeta>>(Value.Rack, Version);
- }
- case 7: {
- auto demand = Rack.Next<NPrivate::ReadFieldRule<RackMeta>>(Value.Rack, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseBroker::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseBroker::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -4651,9 +1699,6 @@ i32 TMetadataResponseData::TMetadataResponseBroker::Size(TKafkaVersion _version)
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseBroker::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -4673,175 +1718,27 @@ TMetadataResponseData::TMetadataResponseTopic::TMetadataResponseTopic()
, TopicAuthorizedOperations(TopicAuthorizedOperationsMeta::Default)
{}
-
-class TMetadataResponseData::TMetadataResponseTopic::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataResponseData::TMetadataResponseTopic& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataResponseData::TMetadataResponseTopic& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<NameMeta> Name;
- NPrivate::TReadStrategy<TopicIdMeta> TopicId;
- NPrivate::TReadStrategy<IsInternalMeta> IsInternal;
- NPrivate::TReadStrategy<PartitionsMeta> Partitions;
- NPrivate::TReadStrategy<TopicAuthorizedOperationsMeta> TopicAuthorizedOperations;
-};
-
-TMetadataResponseData::TMetadataResponseTopic::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseTopic& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ErrorCode()
- , Name()
- , TopicId()
- , IsInternal()
- , Partitions()
- , TopicAuthorizedOperations()
-{}
-
-
-TReadDemand TMetadataResponseData::TMetadataResponseTopic::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 1: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 3: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- TopicId.Init<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- }
- case 5: {
- auto demand = TopicId.Next<NPrivate::ReadFieldRule<TopicIdMeta>>(Value.TopicId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- IsInternal.Init<NPrivate::ReadFieldRule<IsInternalMeta>>(Value.IsInternal, Version);
- }
- case 7: {
- auto demand = IsInternal.Next<NPrivate::ReadFieldRule<IsInternalMeta>>(Value.IsInternal, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- Partitions.Init<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- }
- case 9: {
- auto demand = Partitions.Next<NPrivate::ReadFieldRule<PartitionsMeta>>(Value.Partitions, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- TopicAuthorizedOperations.Init<NPrivate::ReadFieldRule<TopicAuthorizedOperationsMeta>>(Value.TopicAuthorizedOperations, Version);
- }
- case 11: {
- auto demand = TopicAuthorizedOperations.Next<NPrivate::ReadFieldRule<TopicAuthorizedOperationsMeta>>(Value.TopicAuthorizedOperations, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseTopic::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseTopic::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 13: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 15: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 17: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 14;
+void TMetadataResponseData::TMetadataResponseTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseTopic";
+ }
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<TopicIdMeta>(_readable, _version, TopicId);
+ NPrivate::Read<IsInternalMeta>(_readable, _version, IsInternal);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
+ NPrivate::Read<TopicAuthorizedOperationsMeta>(_readable, _version, TopicAuthorizedOperations);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -4878,9 +1775,6 @@ i32 TMetadataResponseData::TMetadataResponseTopic::Size(TKafkaVersion _version)
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseTopic::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -4898,189 +1792,28 @@ TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TMeta
, LeaderEpoch(LeaderEpochMeta::Default)
{}
-
-class TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<PartitionIndexMeta> PartitionIndex;
- NPrivate::TReadStrategy<LeaderIdMeta> LeaderId;
- NPrivate::TReadStrategy<LeaderEpochMeta> LeaderEpoch;
- NPrivate::TReadStrategy<ReplicaNodesMeta> ReplicaNodes;
- NPrivate::TReadStrategy<IsrNodesMeta> IsrNodes;
- NPrivate::TReadStrategy<OfflineReplicasMeta> OfflineReplicas;
-};
-
-TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext::TReadContext(TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ErrorCode()
- , PartitionIndex()
- , LeaderId()
- , LeaderEpoch()
- , ReplicaNodes()
- , IsrNodes()
- , OfflineReplicas()
-{}
-
-
-TReadDemand TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 1: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- PartitionIndex.Init<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version);
- }
- case 3: {
- auto demand = PartitionIndex.Next<NPrivate::ReadFieldRule<PartitionIndexMeta>>(Value.PartitionIndex, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- LeaderId.Init<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version);
- }
- case 5: {
- auto demand = LeaderId.Next<NPrivate::ReadFieldRule<LeaderIdMeta>>(Value.LeaderId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- LeaderEpoch.Init<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version);
- }
- case 7: {
- auto demand = LeaderEpoch.Next<NPrivate::ReadFieldRule<LeaderEpochMeta>>(Value.LeaderEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- ReplicaNodes.Init<NPrivate::ReadFieldRule<ReplicaNodesMeta>>(Value.ReplicaNodes, Version);
- }
- case 9: {
- auto demand = ReplicaNodes.Next<NPrivate::ReadFieldRule<ReplicaNodesMeta>>(Value.ReplicaNodes, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- IsrNodes.Init<NPrivate::ReadFieldRule<IsrNodesMeta>>(Value.IsrNodes, Version);
- }
- case 11: {
- auto demand = IsrNodes.Next<NPrivate::ReadFieldRule<IsrNodesMeta>>(Value.IsrNodes, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- ++Step;
- OfflineReplicas.Init<NPrivate::ReadFieldRule<OfflineReplicasMeta>>(Value.OfflineReplicas, Version);
- }
- case 13: {
- auto demand = OfflineReplicas.Next<NPrivate::ReadFieldRule<OfflineReplicasMeta>>(Value.OfflineReplicas, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- if (!NPrivate::VersionCheck<TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::MessageMeta::FlexibleVersionMin, TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 15: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 17: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 18: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 19: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 16;
+void TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition";
+ }
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex);
+ NPrivate::Read<LeaderIdMeta>(_readable, _version, LeaderId);
+ NPrivate::Read<LeaderEpochMeta>(_readable, _version, LeaderEpoch);
+ NPrivate::Read<ReplicaNodesMeta>(_readable, _version, ReplicaNodes);
+ NPrivate::Read<IsrNodesMeta>(_readable, _version, IsrNodes);
+ NPrivate::Read<OfflineReplicasMeta>(_readable, _version, OfflineReplicas);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -5119,9 +1852,6 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -5135,119 +1865,23 @@ TApiVersionsRequestData::TApiVersionsRequestData()
, ClientSoftwareVersion(ClientSoftwareVersionMeta::Default)
{}
-
-class TApiVersionsRequestData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TApiVersionsRequestData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TApiVersionsRequestData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TApiVersionsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TApiVersionsRequestData";
+ }
+ NPrivate::Read<ClientSoftwareNameMeta>(_readable, _version, ClientSoftwareName);
+ NPrivate::Read<ClientSoftwareVersionMeta>(_readable, _version, ClientSoftwareVersion);
- NPrivate::TReadStrategy<ClientSoftwareNameMeta> ClientSoftwareName;
- NPrivate::TReadStrategy<ClientSoftwareVersionMeta> ClientSoftwareVersion;
-};
-
-TApiVersionsRequestData::TReadContext::TReadContext(TApiVersionsRequestData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ClientSoftwareName()
- , ClientSoftwareVersion()
-{}
-
-
-TReadDemand TApiVersionsRequestData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ClientSoftwareName.Init<NPrivate::ReadFieldRule<ClientSoftwareNameMeta>>(Value.ClientSoftwareName, Version);
- }
- case 1: {
- auto demand = ClientSoftwareName.Next<NPrivate::ReadFieldRule<ClientSoftwareNameMeta>>(Value.ClientSoftwareName, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ClientSoftwareVersion.Init<NPrivate::ReadFieldRule<ClientSoftwareVersionMeta>>(Value.ClientSoftwareVersion, Version);
- }
- case 3: {
- auto demand = ClientSoftwareVersion.Next<NPrivate::ReadFieldRule<ClientSoftwareVersionMeta>>(Value.ClientSoftwareVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- if (!NPrivate::VersionCheck<TApiVersionsRequestData::MessageMeta::FlexibleVersionMin, TApiVersionsRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 5: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 7: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 9: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 6;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -5276,9 +1910,6 @@ i32 TApiVersionsRequestData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TApiVersionsRequestData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -5296,221 +1927,40 @@ TApiVersionsResponseData::TApiVersionsResponseData()
, ZkMigrationReady(ZkMigrationReadyMeta::Default)
{}
-
-class TApiVersionsResponseData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TApiVersionsResponseData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TApiVersionsResponseData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
-
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<ApiKeysMeta> ApiKeys;
- NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs;
- NPrivate::TReadStrategy<SupportedFeaturesMeta> SupportedFeatures;
- NPrivate::TReadStrategy<FinalizedFeaturesEpochMeta> FinalizedFeaturesEpoch;
- NPrivate::TReadStrategy<FinalizedFeaturesMeta> FinalizedFeatures;
- NPrivate::TReadStrategy<ZkMigrationReadyMeta> ZkMigrationReady;
-};
-
-TApiVersionsResponseData::TReadContext::TReadContext(TApiVersionsResponseData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ErrorCode()
- , ApiKeys()
- , ThrottleTimeMs()
- , SupportedFeatures()
- , FinalizedFeaturesEpoch()
- , FinalizedFeatures()
- , ZkMigrationReady()
-{}
-
-
-TReadDemand TApiVersionsResponseData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 1: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ApiKeys.Init<NPrivate::ReadFieldRule<ApiKeysMeta>>(Value.ApiKeys, Version);
- }
- case 3: {
- auto demand = ApiKeys.Next<NPrivate::ReadFieldRule<ApiKeysMeta>>(Value.ApiKeys, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- }
- case 5: {
- auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- SupportedFeatures.Init<NPrivate::ReadFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version);
- }
- case 7: {
- auto demand = SupportedFeatures.Next<NPrivate::ReadFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- FinalizedFeaturesEpoch.Init<NPrivate::ReadFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version);
- }
- case 9: {
- auto demand = FinalizedFeaturesEpoch.Next<NPrivate::ReadFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- FinalizedFeatures.Init<NPrivate::ReadFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version);
- }
- case 11: {
- auto demand = FinalizedFeatures.Next<NPrivate::ReadFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- ++Step;
- ZkMigrationReady.Init<NPrivate::ReadFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version);
- }
- case 13: {
- auto demand = ZkMigrationReady.Next<NPrivate::ReadFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 14: {
- if (!NPrivate::VersionCheck<TApiVersionsResponseData::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 15: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 16: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 17: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 18: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 19: {
- TReadDemand demand;
- switch(Tag_.Value) {
- case 0: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- SupportedFeatures.Init<NPrivate::ReadTaggedFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version);
- }
- demand = SupportedFeatures.Next<NPrivate::ReadTaggedFieldRule<SupportedFeaturesMeta>>(Value.SupportedFeatures, Version);
- break;
- }
- case 1: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- FinalizedFeaturesEpoch.Init<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version);
- }
- demand = FinalizedFeaturesEpoch.Next<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesEpochMeta>>(Value.FinalizedFeaturesEpoch, Version);
- break;
- }
- case 2: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- FinalizedFeatures.Init<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version);
- }
- demand = FinalizedFeatures.Next<NPrivate::ReadTaggedFieldRule<FinalizedFeaturesMeta>>(Value.FinalizedFeatures, Version);
- break;
- }
- case 3: {
- if (!TagInitialized_) {
- TagInitialized_=true;
- ZkMigrationReady.Init<NPrivate::ReadTaggedFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version);
- }
- demand = ZkMigrationReady.Next<NPrivate::ReadTaggedFieldRule<ZkMigrationReadyMeta>>(Value.ZkMigrationReady, Version);
- break;
- }
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 16;
+void TApiVersionsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData";
+ }
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ApiKeysMeta>(_readable, _version, ApiKeys);
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures);
+ NPrivate::Read<FinalizedFeaturesEpochMeta>(_readable, _version, FinalizedFeaturesEpoch);
+ NPrivate::Read<FinalizedFeaturesMeta>(_readable, _version, FinalizedFeatures);
+ NPrivate::Read<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ case SupportedFeaturesMeta::Tag:
+ NPrivate::ReadTag<SupportedFeaturesMeta>(_readable, _version, SupportedFeatures);
+ break;
+ case FinalizedFeaturesEpochMeta::Tag:
+ NPrivate::ReadTag<FinalizedFeaturesEpochMeta>(_readable, _version, FinalizedFeaturesEpoch);
+ break;
+ case FinalizedFeaturesMeta::Tag:
+ NPrivate::ReadTag<FinalizedFeaturesMeta>(_readable, _version, FinalizedFeatures);
+ break;
+ case ZkMigrationReadyMeta::Tag:
+ NPrivate::ReadTag<ZkMigrationReadyMeta>(_readable, _version, ZkMigrationReady);
+ break;
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -5553,9 +2003,6 @@ i32 TApiVersionsResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -5571,133 +2018,24 @@ TApiVersionsResponseData::TApiVersion::TApiVersion()
, MaxVersion(MaxVersionMeta::Default)
{}
-
-class TApiVersionsResponseData::TApiVersion::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TApiVersionsResponseData::TApiVersion& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TApiVersionsResponseData::TApiVersion& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TApiVersionsResponseData::TApiVersion::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TApiVersion";
+ }
+ NPrivate::Read<ApiKeyMeta>(_readable, _version, ApiKey);
+ NPrivate::Read<MinVersionMeta>(_readable, _version, MinVersion);
+ NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion);
- NPrivate::TReadStrategy<ApiKeyMeta> ApiKey;
- NPrivate::TReadStrategy<MinVersionMeta> MinVersion;
- NPrivate::TReadStrategy<MaxVersionMeta> MaxVersion;
-};
-
-TApiVersionsResponseData::TApiVersion::TReadContext::TReadContext(TApiVersionsResponseData::TApiVersion& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ApiKey()
- , MinVersion()
- , MaxVersion()
-{}
-
-
-TReadDemand TApiVersionsResponseData::TApiVersion::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ApiKey.Init<NPrivate::ReadFieldRule<ApiKeyMeta>>(Value.ApiKey, Version);
- }
- case 1: {
- auto demand = ApiKey.Next<NPrivate::ReadFieldRule<ApiKeyMeta>>(Value.ApiKey, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- MinVersion.Init<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version);
- }
- case 3: {
- auto demand = MinVersion.Next<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- MaxVersion.Init<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version);
- }
- case 5: {
- auto demand = MaxVersion.Next<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TApiVersionsResponseData::TApiVersion::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TApiVersion::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -5728,9 +2066,6 @@ i32 TApiVersionsResponseData::TApiVersion::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TApiVersion::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -5746,133 +2081,24 @@ TApiVersionsResponseData::TSupportedFeatureKey::TSupportedFeatureKey()
, MaxVersion(MaxVersionMeta::Default)
{}
-
-class TApiVersionsResponseData::TSupportedFeatureKey::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TApiVersionsResponseData::TSupportedFeatureKey& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TApiVersionsResponseData::TSupportedFeatureKey& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TApiVersionsResponseData::TSupportedFeatureKey::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TSupportedFeatureKey";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<MinVersionMeta>(_readable, _version, MinVersion);
+ NPrivate::Read<MaxVersionMeta>(_readable, _version, MaxVersion);
- NPrivate::TReadStrategy<NameMeta> Name;
- NPrivate::TReadStrategy<MinVersionMeta> MinVersion;
- NPrivate::TReadStrategy<MaxVersionMeta> MaxVersion;
-};
-
-TApiVersionsResponseData::TSupportedFeatureKey::TReadContext::TReadContext(TApiVersionsResponseData::TSupportedFeatureKey& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Name()
- , MinVersion()
- , MaxVersion()
-{}
-
-
-TReadDemand TApiVersionsResponseData::TSupportedFeatureKey::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 1: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- MinVersion.Init<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version);
- }
- case 3: {
- auto demand = MinVersion.Next<NPrivate::ReadFieldRule<MinVersionMeta>>(Value.MinVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- MaxVersion.Init<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version);
- }
- case 5: {
- auto demand = MaxVersion.Next<NPrivate::ReadFieldRule<MaxVersionMeta>>(Value.MaxVersion, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TApiVersionsResponseData::TSupportedFeatureKey::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TSupportedFeatureKey::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -5903,9 +2129,6 @@ i32 TApiVersionsResponseData::TSupportedFeatureKey::Size(TKafkaVersion _version)
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TSupportedFeatureKey::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -5921,133 +2144,24 @@ TApiVersionsResponseData::TFinalizedFeatureKey::TFinalizedFeatureKey()
, MinVersionLevel(MinVersionLevelMeta::Default)
{}
-
-class TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TApiVersionsResponseData::TFinalizedFeatureKey& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TApiVersionsResponseData::TFinalizedFeatureKey& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TApiVersionsResponseData::TFinalizedFeatureKey::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TApiVersionsResponseData::TFinalizedFeatureKey";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<MaxVersionLevelMeta>(_readable, _version, MaxVersionLevel);
+ NPrivate::Read<MinVersionLevelMeta>(_readable, _version, MinVersionLevel);
- NPrivate::TReadStrategy<NameMeta> Name;
- NPrivate::TReadStrategy<MaxVersionLevelMeta> MaxVersionLevel;
- NPrivate::TReadStrategy<MinVersionLevelMeta> MinVersionLevel;
-};
-
-TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext::TReadContext(TApiVersionsResponseData::TFinalizedFeatureKey& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , Name()
- , MaxVersionLevel()
- , MinVersionLevel()
-{}
-
-
-TReadDemand TApiVersionsResponseData::TFinalizedFeatureKey::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- Name.Init<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- }
- case 1: {
- auto demand = Name.Next<NPrivate::ReadFieldRule<NameMeta>>(Value.Name, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- MaxVersionLevel.Init<NPrivate::ReadFieldRule<MaxVersionLevelMeta>>(Value.MaxVersionLevel, Version);
- }
- case 3: {
- auto demand = MaxVersionLevel.Next<NPrivate::ReadFieldRule<MaxVersionLevelMeta>>(Value.MaxVersionLevel, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- MinVersionLevel.Init<NPrivate::ReadFieldRule<MinVersionLevelMeta>>(Value.MinVersionLevel, Version);
- }
- case 5: {
- auto demand = MinVersionLevel.Next<NPrivate::ReadFieldRule<MinVersionLevelMeta>>(Value.MinVersionLevel, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- if (!NPrivate::VersionCheck<TApiVersionsResponseData::TFinalizedFeatureKey::MessageMeta::FlexibleVersionMin, TApiVersionsResponseData::TFinalizedFeatureKey::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 7: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 9: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 11: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 8;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -6078,9 +2192,6 @@ i32 TApiVersionsResponseData::TFinalizedFeatureKey::Size(TKafkaVersion _version)
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TApiVersionsResponseData::TFinalizedFeatureKey::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -6098,147 +2209,25 @@ TInitProducerIdRequestData::TInitProducerIdRequestData()
, ProducerEpoch(ProducerEpochMeta::Default)
{}
-
-class TInitProducerIdRequestData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TInitProducerIdRequestData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TInitProducerIdRequestData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TInitProducerIdRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TInitProducerIdRequestData";
+ }
+ NPrivate::Read<TransactionalIdMeta>(_readable, _version, TransactionalId);
+ NPrivate::Read<TransactionTimeoutMsMeta>(_readable, _version, TransactionTimeoutMs);
+ NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId);
+ NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch);
- NPrivate::TReadStrategy<TransactionalIdMeta> TransactionalId;
- NPrivate::TReadStrategy<TransactionTimeoutMsMeta> TransactionTimeoutMs;
- NPrivate::TReadStrategy<ProducerIdMeta> ProducerId;
- NPrivate::TReadStrategy<ProducerEpochMeta> ProducerEpoch;
-};
-
-TInitProducerIdRequestData::TReadContext::TReadContext(TInitProducerIdRequestData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , TransactionalId()
- , TransactionTimeoutMs()
- , ProducerId()
- , ProducerEpoch()
-{}
-
-
-TReadDemand TInitProducerIdRequestData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- TransactionalId.Init<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version);
- }
- case 1: {
- auto demand = TransactionalId.Next<NPrivate::ReadFieldRule<TransactionalIdMeta>>(Value.TransactionalId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- TransactionTimeoutMs.Init<NPrivate::ReadFieldRule<TransactionTimeoutMsMeta>>(Value.TransactionTimeoutMs, Version);
- }
- case 3: {
- auto demand = TransactionTimeoutMs.Next<NPrivate::ReadFieldRule<TransactionTimeoutMsMeta>>(Value.TransactionTimeoutMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- }
- case 5: {
- auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- ProducerEpoch.Init<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version);
- }
- case 7: {
- auto demand = ProducerEpoch.Next<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TInitProducerIdRequestData::MessageMeta::FlexibleVersionMin, TInitProducerIdRequestData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -6271,9 +2260,6 @@ i32 TInitProducerIdRequestData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TInitProducerIdRequestData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
//
@@ -6291,147 +2277,25 @@ TInitProducerIdResponseData::TInitProducerIdResponseData()
, ProducerEpoch(ProducerEpochMeta::Default)
{}
-
-class TInitProducerIdResponseData::TReadContext : public NKafka::TReadContext {
-public:
- TReadContext(TInitProducerIdResponseData& value, TKafkaVersion version);
- TReadDemand Next() override;
-private:
- TInitProducerIdResponseData& Value;
- TKafkaVersion Version;
- size_t Step;
-
- NPrivate::ReadUnsignedVarintStrategy NumTaggedFields_;
- NPrivate::ReadUnsignedVarintStrategy Tag_;
- NPrivate::ReadUnsignedVarintStrategy TagSize_;
- bool TagInitialized_;
+void TInitProducerIdResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersionMin, MessageMeta::PresentVersionMax>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TInitProducerIdResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ProducerIdMeta>(_readable, _version, ProducerId);
+ NPrivate::Read<ProducerEpochMeta>(_readable, _version, ProducerEpoch);
- NPrivate::TReadStrategy<ThrottleTimeMsMeta> ThrottleTimeMs;
- NPrivate::TReadStrategy<ErrorCodeMeta> ErrorCode;
- NPrivate::TReadStrategy<ProducerIdMeta> ProducerId;
- NPrivate::TReadStrategy<ProducerEpochMeta> ProducerEpoch;
-};
-
-TInitProducerIdResponseData::TReadContext::TReadContext(TInitProducerIdResponseData& value, TKafkaVersion version)
- : Value(value)
- , Version(version)
- , Step(0)
- , ThrottleTimeMs()
- , ErrorCode()
- , ProducerId()
- , ProducerEpoch()
-{}
-
-
-TReadDemand TInitProducerIdResponseData::TReadContext::Next() {
- while(true) {
- switch(Step) {
- case 0: {
- ++Step;
- ThrottleTimeMs.Init<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- }
- case 1: {
- auto demand = ThrottleTimeMs.Next<NPrivate::ReadFieldRule<ThrottleTimeMsMeta>>(Value.ThrottleTimeMs, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 2: {
- ++Step;
- ErrorCode.Init<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- }
- case 3: {
- auto demand = ErrorCode.Next<NPrivate::ReadFieldRule<ErrorCodeMeta>>(Value.ErrorCode, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 4: {
- ++Step;
- ProducerId.Init<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- }
- case 5: {
- auto demand = ProducerId.Next<NPrivate::ReadFieldRule<ProducerIdMeta>>(Value.ProducerId, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 6: {
- ++Step;
- ProducerEpoch.Init<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version);
- }
- case 7: {
- auto demand = ProducerEpoch.Next<NPrivate::ReadFieldRule<ProducerEpochMeta>>(Value.ProducerEpoch, Version);
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 8: {
- if (!NPrivate::VersionCheck<TInitProducerIdResponseData::MessageMeta::FlexibleVersionMin, TInitProducerIdResponseData::MessageMeta::FlexibleVersionMax>(Version)) return NoDemand;
- ++Step;
- NumTaggedFields_.Init();
- }
- case 9: {
- auto demand = NumTaggedFields_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 10: {
- ++Step;
- if (NumTaggedFields_.Value <= 0) return NoDemand;
- --NumTaggedFields_.Value;
- Tag_.Init();
- TagSize_.Init();
- TagInitialized_=false;
- }
- case 11: {
- auto demand = Tag_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 12: {
- auto demand = TagSize_.Next();
- if (demand) {
- return demand;
- } else {
- ++Step;
- }
- }
- case 13: {
- TReadDemand demand;
- switch(Tag_.Value) {
- default: {
- if (!TagInitialized_) {
- TagInitialized_ = true;
- demand = TReadDemand(TagSize_.Value);
- } else {
- demand = NoDemand;
- }
- }
- }
- if (demand) {
- return demand;
- } else {
- Step = 10;
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersionMin, MessageMeta::FlexibleVersionMax>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
break;
- }
}
- default:
- return NoDemand;
}
}
}
@@ -6464,7 +2328,4 @@ i32 TInitProducerIdResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
-std::unique_ptr<NKafka::TReadContext> TInitProducerIdResponseData::CreateReadContext(TKafkaVersion _version) {
- return std::unique_ptr<NKafka::TReadContext>(new TReadContext(*this, _version));
-}
} //namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index c070177e7f..4ca58fb3cd 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -116,11 +116,9 @@ public:
};
ClientIdMeta::Type ClientId;
- class TReadContext;
-
i16 ApiKey() const override { return HEADER; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TRequestHeaderData& other) const = default;
@@ -158,11 +156,9 @@ public:
};
CorrelationIdMeta::Type CorrelationId;
- class TReadContext;
-
i16 ApiKey() const override { return HEADER; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TResponseHeaderData& other) const = default;
@@ -242,10 +238,8 @@ public:
};
RecordsMeta::Type Records;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TPartitionProduceData& other) const = default;
@@ -290,10 +284,8 @@ public:
};
PartitionDataMeta::Type PartitionData;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TTopicProduceData& other) const = default;
@@ -376,11 +368,9 @@ public:
};
TopicDataMeta::Type TopicData;
- class TReadContext;
-
i16 ApiKey() const override { return PRODUCE; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TProduceRequestData& other) const = default;
@@ -473,10 +463,8 @@ public:
};
BatchIndexErrorMessageMeta::Type BatchIndexErrorMessage;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TBatchIndexAndErrorMessage& other) const = default;
@@ -616,10 +604,8 @@ public:
};
ErrorMessageMeta::Type ErrorMessage;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TPartitionProduceResponse& other) const = default;
@@ -664,10 +650,8 @@ public:
};
PartitionResponsesMeta::Type PartitionResponses;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TTopicProduceResponse& other) const = default;
@@ -712,11 +696,9 @@ public:
};
ThrottleTimeMsMeta::Type ThrottleTimeMs;
- class TReadContext;
-
i16 ApiKey() const override { return PRODUCE; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TProduceResponseData& other) const = default;
@@ -873,10 +855,8 @@ public:
};
PartitionMaxBytesMeta::Type PartitionMaxBytes;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFetchPartition& other) const = default;
@@ -940,10 +920,8 @@ public:
};
PartitionsMeta::Type Partitions;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFetchTopic& other) const = default;
@@ -1019,10 +997,8 @@ public:
};
PartitionsMeta::Type Partitions;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TForgottenTopic& other) const = default;
@@ -1240,11 +1216,9 @@ public:
};
RackIdMeta::Type RackId;
- class TReadContext;
-
i16 ApiKey() const override { return FETCH; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFetchRequestData& other) const = default;
@@ -1337,10 +1311,8 @@ public:
};
EndOffsetMeta::Type EndOffset;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TEpochEndOffset& other) const = default;
@@ -1396,10 +1368,8 @@ public:
};
LeaderEpochMeta::Type LeaderEpoch;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TLeaderIdAndEpoch& other) const = default;
@@ -1455,10 +1425,8 @@ public:
};
EpochMeta::Type Epoch;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TSnapshotId& other) const = default;
@@ -1514,10 +1482,8 @@ public:
};
FirstOffsetMeta::Type FirstOffset;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TAbortedTransaction& other) const = default;
@@ -1732,10 +1698,8 @@ public:
};
RecordsMeta::Type Records;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TPartitionData& other) const = default;
@@ -1799,10 +1763,8 @@ public:
};
PartitionsMeta::Type Partitions;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFetchableTopicResponse& other) const = default;
@@ -1885,11 +1847,9 @@ public:
};
ResponsesMeta::Type Responses;
- class TReadContext;
-
i16 ApiKey() const override { return FETCH; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFetchResponseData& other) const = default;
@@ -1958,10 +1918,8 @@ public:
};
NameMeta::Type Name;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataRequestTopic& other) const = default;
@@ -2044,11 +2002,9 @@ public:
};
IncludeTopicAuthorizedOperationsMeta::Type IncludeTopicAuthorizedOperations;
- class TReadContext;
-
i16 ApiKey() const override { return METADATA; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataRequestData& other) const = default;
@@ -2155,10 +2111,8 @@ public:
};
RackMeta::Type Rack;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataResponseBroker& other) const = default;
@@ -2324,10 +2278,8 @@ public:
};
OfflineReplicasMeta::Type OfflineReplicas;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataResponsePartition& other) const = default;
@@ -2448,10 +2400,8 @@ public:
};
TopicAuthorizedOperationsMeta::Type TopicAuthorizedOperations;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataResponseTopic& other) const = default;
@@ -2573,11 +2523,9 @@ public:
};
ClusterAuthorizedOperationsMeta::Type ClusterAuthorizedOperations;
- class TReadContext;
-
i16 ApiKey() const override { return METADATA; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TMetadataResponseData& other) const = default;
@@ -2634,11 +2582,9 @@ public:
};
ClientSoftwareVersionMeta::Type ClientSoftwareVersion;
- class TReadContext;
-
i16 ApiKey() const override { return API_VERSIONS; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TApiVersionsRequestData& other) const = default;
@@ -2726,10 +2672,8 @@ public:
};
MaxVersionMeta::Type MaxVersion;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TApiVersion& other) const = default;
@@ -2804,10 +2748,8 @@ public:
};
MaxVersionMeta::Type MaxVersion;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TSupportedFeatureKey& other) const = default;
@@ -2882,10 +2824,8 @@ public:
};
MinVersionLevelMeta::Type MinVersionLevel;
- class TReadContext;
-
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TFinalizedFeatureKey& other) const = default;
@@ -3031,11 +2971,9 @@ public:
};
ZkMigrationReadyMeta::Type ZkMigrationReady;
- class TReadContext;
-
i16 ApiKey() const override { return API_VERSIONS; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TApiVersionsResponseData& other) const = default;
@@ -3130,11 +3068,9 @@ public:
};
ProducerEpochMeta::Type ProducerEpoch;
- class TReadContext;
-
i16 ApiKey() const override { return INIT_PRODUCER_ID; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TInitProducerIdRequestData& other) const = default;
@@ -3229,11 +3165,9 @@ public:
};
ProducerEpochMeta::Type ProducerEpoch;
- class TReadContext;
-
i16 ApiKey() const override { return INIT_PRODUCER_ID; };
i32 Size(TKafkaVersion version) const override;
- std::unique_ptr<NKafka::TReadContext> CreateReadContext(TKafkaVersion version) override /*{ return new TReadContext(*this, version); }*/;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
bool operator==(const TInitProducerIdResponseData& other) const = default;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp
index 9604fdaa76..01ef8ba9b8 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp
@@ -2,19 +2,13 @@
namespace NKafka {
-void ErrorOnUnexpectedEnd(std::istream& is) {
- if (is.eof()) {
- ythrow yexception() << "unexpected end of stream";
- }
-}
-
TKafkaWritable& TKafkaWritable::operator<<(const TKafkaRawBytes& val) {
- Os.write(val.data(), val.size());
+ write(val.data(), val.size());
return *this;
}
TKafkaWritable& TKafkaWritable::operator<<(const TKafkaRawString& val) {
- Os.write(val.data(), val.length());
+ write(val.data(), val.length());
return *this;
}
@@ -28,10 +22,15 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
void TKafkaWritable::writeUnsignedVarint(TKafkaUint32 value) {
while ((value & 0xffffff80) != 0L) {
ui8 b = (ui8) ((value & 0x7f) | 0x80);
- Os << b;
+ write((const char*)&b, sizeof(b));
value >>= 7;
}
- Os << (ui8) value;
+ ui8 b = (ui8) value;
+ write((const char*)&b, sizeof(b));
+}
+
+void TKafkaWritable::write(const char* val, size_t length) {
+ Buffer.write(val, length);
}
TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {
@@ -45,17 +44,30 @@ TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {
}
-void TKafkaReadable::read(char* val, int length) {
- Is.read(val, length);
- ErrorOnUnexpectedEnd(Is);
+void TKafkaReadable::read(char* val, size_t length) {
+ checkEof(length);
+ memcpy(val, Is.Data() + Position, length);
+ Position += length;
+}
+
+char TKafkaReadable::get() {
+ char r;
+ read(&r, sizeof(r));
+ return r;
+}
+
+TArrayRef<const char> TKafkaReadable::Bytes(size_t length) {
+ checkEof(length);
+ TArrayRef<const char> r(Is.Data() + Position, length);
+ Position += length;
+ return r;
}
ui32 TKafkaReadable::readUnsignedVarint() {
ui32 value = 0;
ui32 i = 0;
ui16 b;
- while (((b = Is.get()) & 0x80) != 0) {
- ErrorOnUnexpectedEnd(Is);
+ while (((b = get()) & 0x80) != 0) {
value |= ((ui32)(b & 0x7f)) << i;
i += 7;
@@ -64,21 +76,19 @@ ui32 TKafkaReadable::readUnsignedVarint() {
}
}
- ErrorOnUnexpectedEnd(Is);
-
value |= b << i;
return value;
}
-void TKafkaReadable::skip(int length) {
- char buffer[64];
- while (length) {
- int l = std::min(length, 64);
- Is.read(buffer, l);
- length -= l;
- }
+void TKafkaReadable::skip(size_t length) {
+ checkEof(length);
+ Position += length;
+}
- ErrorOnUnexpectedEnd(Is);
+void TKafkaReadable::checkEof(size_t length) {
+ if (Position + length > Is.Size()) {
+ ythrow yexception() << "unexpected end of stream";
+ }
}
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 3f02644272..fe0bbb3a86 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -234,337 +234,6 @@ inline TKafkaInt32 ReadArraySize(TKafkaReadable& readable, TKafkaVersion version
}
-template<typename T>
-void NormalizeNumber(T& value) {
-#ifndef WORDS_BIGENDIAN
- char* b = (char*)&value;
- char* e = b + sizeof(T) - 1;
- while(b < e) {
- std::swap(*b, *e);
- ++b;
- --e;
- }
-#endif
-}
-
-class ReadUnsignedVarintStrategy {
-public:
- void Init() {
- Finished = 0;
- Shift = -1;
- Value = 0;
- }
-
- TReadDemand Next() {
- if (Finished) {
- return NoDemand;
- }
- if (Shift >= 0) {
- Finished = !(Buffer & 0x80);
- ui32 v = Buffer & 0x7F;
- Value |= v << Shift;
- Shift += 7;
- } else {
- Shift = 0;
- }
-
- if (Finished) {
- return NoDemand;
- } else {
- return TReadDemand(&Buffer, sizeof(char));
- }
- }
-
- TKafkaInt32 Value;
-
-private:
- bool Finished;
- char Buffer;
- i8 Shift;
-};
-
-
-
-template<typename Meta, typename TOldSizeType>
-class ReadSizeStrategy {
-public:
- void Init() {
- WasRead = false;
- WasNormalized = false;
- OldValue = 0;
- UnsignedVarintStrategy.Init();
- }
-
- TReadDemand Next(TKafkaVersion version) {
- if (VersionCheck<Meta::FlexibleVersionMin, Meta::FlexibleVersionMax>(version)) {
- return UnsignedVarintStrategy.Next();
- } else {
- if (WasRead) {
- if (!WasNormalized) {
- WasNormalized = true;
- NormalizeNumber(OldValue);
- }
- return NoDemand;
- }
- WasRead = true;
- return TReadDemand((char*)&OldValue, sizeof(TOldSizeType));
- }
- }
-
- TKafkaInt32 Value(TKafkaVersion version) {
- if (VersionCheck<Meta::FlexibleVersionMin, Meta::FlexibleVersionMax>(version)) {
- return UnsignedVarintStrategy.Value - 1;
- } else {
- return OldValue;
- }
- }
-
-private:
- bool WasRead;
- bool WasNormalized;
- TOldSizeType OldValue;
-
- ReadUnsignedVarintStrategy UnsignedVarintStrategy;
-};
-
-
-
-template<typename Meta,
- typename TValueType = typename Meta::Type,
- typename TTypeDesc = typename Meta::TypeDesc>
-class TReadStrategy {
-public:
- template<typename Rule>
- void Init(TValueType& /*value*/, TKafkaVersion /*version*/) {
- WasRead = false;
- }
-
- template<typename Rule>
- TReadDemand Next(TValueType& value, TKafkaVersion version) {
- if (!Rule::Apply(version)) {
- if constexpr (Meta::TypeDesc::Default) {
- value = Meta::Default;
- }
- return NoDemand;
- }
- if (WasRead) {
- NormalizeNumber(value);
- return NoDemand;
- }
- WasRead = true;
- return TReadDemand((char*)&value, sizeof(TValueType));
- }
-
-private:
- bool WasRead;
-};
-
-
-template<typename Meta, typename TValueType>
-class TReadStrategy<Meta, TValueType, TKafkaStructDesc> {
-public:
- template<typename Rule>
- void Init(TValueType& value, TKafkaVersion version) {
- if (Rule::Apply(version)) {
- Context = value.CreateReadContext(version);
- }
- }
-
- template<typename Rule>
- TReadDemand Next(TValueType& /*value*/, TKafkaVersion version) {
- if (Rule::Apply(version)) {
- return Context.get()->Next();
- } else {
- return NoDemand;
- }
- }
-
-private:
- std::unique_ptr<TReadContext> Context;
-};
-
-
-template<typename Meta, typename TValueType>
-class TReadStrategy<Meta, TValueType, TKafkaUuidDesc> {
-public:
- template<typename Rule>
- void Init(TKafkaUuid& /*value*/, TKafkaVersion /*version*/) {
- WasRead = false;
- }
-
- template<typename Rule>
- TReadDemand Next(TKafkaUuid& value, TKafkaVersion version) {
- if (!Rule::Apply(version)) {
- return NoDemand;
- }
- if (WasRead) {
- NormalizeNumber(Buffer[0]);
- NormalizeNumber(Buffer[1]);
- value = TKafkaUuid(Buffer[0], Buffer[1]);
- return NoDemand;
- }
- WasRead = true;
- return TReadDemand((char*)Buffer, sizeof(ui64) << 1);
- }
-
-private:
- bool WasRead;
- ui64 Buffer[2];
-};
-
-
-
-template<typename Meta, typename TValueType>
-class TReadStrategy<Meta, TValueType, TKafkaStringDesc> {
-public:
- template<typename Rule>
- void Init(TKafkaString& /*value*/, TKafkaVersion /*version*/) {
- WasRead = false;
- Size.Init();
- }
-
- template<typename Rule>
- TReadDemand Next(TKafkaString& value, TKafkaVersion version) {
- if (WasRead || !Rule::Apply(version)) {
- return NoDemand;
- }
- auto demand = Size.Next(version);
- if (demand) {
- return demand;
- }
- WasRead = true;
- TKafkaInt32 length = Size.Value(version);
-
- if (length < 0) {
- if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) {
- value = std::nullopt;
- return NoDemand;
- } else {
- ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null";
- }
- } else if (length > Max<i16>()){
- ythrow yexception() << "string field " << Meta::Name << " had invalid length " << length;
- }
-
- value = TString();
- value->ReserveAndResize(length);
- return TReadDemand((char*)value->data(), length);
- }
-
-private:
- bool WasRead;
-
- ReadSizeStrategy<Meta, TKafkaInt16> Size;
-};
-
-
-template<typename Meta, typename Desc>
-class TReadStrategy<Meta, TKafkaRecords, Desc> {
-public:
- template<typename Rule>
- void Init(TKafkaRecords& /*value*/, TKafkaVersion /*version*/) {
- WasRead = false;
- Size.Init();
- }
-
- template<typename Rule>
- TReadDemand Next(TKafkaRecords& value, TKafkaVersion version) {
- if (WasRead || !Rule::Apply(version)) {
- return NoDemand;
- }
- auto demand = Size.Next(version);
- if (demand) {
- return demand;
- }
- WasRead = true;
- TKafkaInt32 length = Size.Value(version);
-
- if (length < 0) {
- if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) {
- value = std::nullopt;
- return NoDemand;
- } else {
- ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null";
- }
- } else if (length > MAX_RECORDS_SIZE) {
- ythrow yexception() << "records fields " << Meta::Name << " has invalid length " << length;
- }
-
- value = TKafkaRawBytes();
- value->Resize(length);
- return TReadDemand(value->data(), length);
- }
-
-private:
- bool WasRead;
-
- ReadSizeStrategy<Meta, TKafkaInt32> Size;
-};
-
-
-template<typename Meta, typename TValueType>
-class TReadStrategy<Meta, TValueType, TKafkaArrayDesc> {
-public:
- template<typename Rule>
- void Init(std::vector<typename Meta::ItemType>& /*value*/, TKafkaVersion /*version*/) {
- ItemStep = -1;
- Size.Init();
- }
-
- template<typename Rule>
- TReadDemand Next(std::vector<typename Meta::ItemType>& value, TKafkaVersion version) {
- if (!Rule::Apply(version)) {
- return NoDemand;
- }
- auto demand = Size.Next(version);
- if (demand) {
- return demand;
- }
- if (-1 == ItemStep) {
- TKafkaInt32 length = Size.Value(version);
-
- if (length < 0) {
- if (VersionCheck<Meta::NullableVersionMin, Meta::NullableVersionMax>(version)) {
- value.resize(0);
- return NoDemand;
- } else {
- ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null";
- }
- }
-
- value.resize(length);
- if (0 == length) {
- return NoDemand;
- }
-
- ItemStep = 0;
- ItemStrategy.template Init<Rule>(value[ItemStep], version);
- } else if (ItemStep == (i32)value.size()) {
- return NoDemand;
- }
-
- demand = ItemStrategy.template Next<Rule>(value[ItemStep], version);
- if (demand) {
- return demand;
- }
-
- ++ItemStep;
- if (ItemStep == (i32)value.size()) {
- return NoDemand;
- }
- ItemStrategy.template Init<Rule>(value[ItemStep], version);
- return ItemStrategy.template Next<Rule>(value[ItemStep], version);
- }
-
-private:
- i32 ItemStep;
-
- ReadSizeStrategy<Meta, TKafkaInt32> Size;
- TReadStrategy<Meta, typename Meta::ItemType, typename Meta::ItemTypeDesc> ItemStrategy;
-};
-
-
-
//
@@ -712,9 +381,7 @@ public:
ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null";
}
} else {
- value = TBuffer();
- value->Resize(length);
- readable.read(value->data(), length);
+ value = readable.Bytes(length);
}
}
diff --git a/ydb/core/kafka_proxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index ed8185637a..eeffb3d604 100644
--- a/ydb/core/kafka_proxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -8,86 +8,11 @@ using namespace NKafka;
void Print(std::string& sb);
-class TReadProcessor {
-public:
- TReadProcessor(std::stringstream& buffer)
- : Buffer(std::istreambuf_iterator<char>(buffer), {})
- , Position(0)
- {
- Print(Buffer);
- }
-
- std::string Buffer;
- size_t Position;
-
- void Read(TMessage* msg, TKafkaVersion version) {
- auto ctx = msg->CreateReadContext(version);
-
-
- while(true) {
- auto demand = ctx->Next();
- Cerr << "TReadProcessor:: demand length=" << demand.GetLength() << ", position=" << Position << ", length=" << Buffer.length() << Endl;
- if (!demand) {
- break;
- }
-
- if (!(Buffer.length() >= Position + demand.GetLength())) {
- EXPECT_TRUE(Buffer.length() >= Position + demand.GetLength());
- return;
- }
-
- if (!demand.Skip()) {
- memcpy(demand.GetBuffer(), Buffer.data() + Position, demand.GetLength());
- }
- Position += demand.Length;
- }
-
- EXPECT_FALSE(Position < Buffer.length());
- }
-};
-
-template<typename Meta>
-class TFieldReadProcessor {
-public:
- NKafka::NPrivate::TReadStrategy<Meta> strategy;
-
- TFieldReadProcessor(std::stringstream& buffer)
- : Buffer(std::istreambuf_iterator<char>(buffer), {})
- , Position(0)
- {
- Print(Buffer);
- }
-
- std::string Buffer;
- size_t Position;
-
- void Read(typename Meta::Type& field, TKafkaVersion version) {
- strategy.template Init<NKafka::NPrivate::ReadFieldRule<Meta>>(field, version);
+static constexpr size_t BUFFER_SIZE = 1 << 16;
- while(true) {
- auto demand = strategy.template Next<NKafka::NPrivate::ReadFieldRule<Meta>>(field, version);
- Cerr << "TFieldReadProcessor:: demand length=" << demand.GetLength() << ", position=" << Position << ", length=" << Buffer.length() << Endl;
- if (!demand) {
- break;
- }
-
- if (!(Buffer.length() >= Position + demand.GetLength())) {
- EXPECT_TRUE(Buffer.length() >= Position + demand.GetLength());
- return;
- }
-
- if (!demand.Skip()) {
- memcpy(demand.GetBuffer(), Buffer.data() + Position, demand.GetLength());
- }
- Position += demand.Length;
- }
-
- EXPECT_FALSE(Position < Buffer.length());
- }
-};
TEST(Serialization, RequestHeader) {
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TRequestHeaderData value;
@@ -99,9 +24,9 @@ TEST(Serialization, RequestHeader) {
TKafkaWritable writable(sb);
value.Write(writable, 1);
- TReadProcessor processor(sb);
TRequestHeaderData result;
- processor.Read(&result, 1);
+ TKafkaReadable readable(sb.GetBuffer());
+ result.Read(readable, 1);
EXPECT_EQ(result.RequestApiKey, 3);
EXPECT_EQ(result.RequestApiVersion, 7);
@@ -110,7 +35,7 @@ TEST(Serialization, RequestHeader) {
}
TEST(Serialization, ResponseHeader) {
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TResponseHeaderData value;
@@ -119,15 +44,15 @@ TEST(Serialization, ResponseHeader) {
TKafkaWritable writable(sb);
value.Write(writable, 0);
- TReadProcessor processor(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TResponseHeaderData result;
- processor.Read(&result, 0);
+ result.Read(readable, 0);
EXPECT_EQ(result.CorrelationId, 13);
}
TEST(Serialization, ApiVersionsRequest) {
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TApiVersionsRequestData value;
@@ -137,10 +62,9 @@ TEST(Serialization, ApiVersionsRequest) {
TKafkaWritable writable(sb);
value.Write(writable, 3);
-
- TReadProcessor processor(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TApiVersionsRequestData result;
- processor.Read(&result, 3);
+ result.Read(readable, 3);
EXPECT_EQ(*result.ClientSoftwareName, "apache-kafka-java");
EXPECT_EQ(*result.ClientSoftwareVersion, "3.4.0");
@@ -149,7 +73,7 @@ TEST(Serialization, ApiVersionsRequest) {
TEST(Serialization, ApiVersionsResponse) {
TString longString = "long-string-value-0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TApiVersionsResponseData value;
@@ -186,9 +110,9 @@ TEST(Serialization, ApiVersionsResponse) {
TKafkaWritable writable(sb);
value.Write(writable, 3);
- TReadProcessor processor(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TApiVersionsResponseData result;
- processor.Read(&result, 3);
+ result.Read(readable, 3);
EXPECT_EQ(result.ErrorCode, 7);
EXPECT_EQ(result.ApiKeys.size(), 2ul);
@@ -211,7 +135,7 @@ TEST(Serialization, ProduceRequest) {
char data0[] = "it-is produce data message 1";
char data1[] = "it-is produce data other message 2";
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TProduceRequestData value;
@@ -222,21 +146,21 @@ TEST(Serialization, ProduceRequest) {
value.TopicData[0].Name = "/it/is/some/topic/name";
value.TopicData[0].PartitionData.resize(2);
value.TopicData[0].PartitionData[0].Index = 0;
- value.TopicData[0].PartitionData[0].Records = { TBuffer(data0, sizeof(data0)) };
+ value.TopicData[0].PartitionData[0].Records = TArrayRef(data0);
value.TopicData[0].PartitionData[1].Index = 1;
value.TopicData[0].PartitionData[1].Records = {};
value.TopicData[1].Name = "/it/is/other/topic/name";
value.TopicData[1].PartitionData.resize(1);
value.TopicData[1].PartitionData[0].Index = 0;
- value.TopicData[1].PartitionData[0].Records = { TBuffer(data1, sizeof(data1)) };
+ value.TopicData[1].PartitionData[0].Records = TArrayRef(data1);
TKafkaWritable writable(sb);
value.Write(writable, 3);
- TReadProcessor processor(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TProduceRequestData result;
- processor.Read(&result, 3);
+ result.Read(readable, 3);
EXPECT_TRUE(result.TransactionalId);
EXPECT_EQ(*result.TransactionalId, "transactional-id-value-123456" );
@@ -248,7 +172,7 @@ TEST(Serialization, ProduceRequest) {
EXPECT_EQ(result.TopicData[0].PartitionData.size(), 2ul);
EXPECT_EQ(result.TopicData[0].PartitionData[0].Index, 0);
EXPECT_TRUE(result.TopicData[0].PartitionData[0].Records);
- EXPECT_EQ(*result.TopicData[0].PartitionData[0].Records, TBuffer(data0, sizeof(data0)));
+ EXPECT_EQ(*result.TopicData[0].PartitionData[0].Records, TArrayRef(data0));
EXPECT_EQ(result.TopicData[0].PartitionData[1].Index, 1);
EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, std::nullopt);
EXPECT_TRUE(result.TopicData[1].Name);
@@ -256,19 +180,19 @@ TEST(Serialization, ProduceRequest) {
EXPECT_EQ(result.TopicData[1].PartitionData.size(), 1ul);
EXPECT_EQ(result.TopicData[1].PartitionData[0].Index, 0);
EXPECT_TRUE(result.TopicData[1].PartitionData[0].Records);
- EXPECT_EQ(*result.TopicData[1].PartitionData[0].Records, TBuffer(data1, sizeof(data1)));
+ EXPECT_EQ(*result.TopicData[1].PartitionData[0].Records, TArrayRef(data1));
}
TEST(Serialization, UnsignedVarint) {
std::vector<ui32> values = {0, 1, 127, 128, 32191};
for(ui32 v : values) {
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TKafkaWritable writable(sb);
- TKafkaReadable redable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
writable.writeUnsignedVarint(v);
- ui32 r = redable.readUnsignedVarint();
+ ui32 r = readable.readUnsignedVarint();
EXPECT_EQ(r, v);
}
}
@@ -277,9 +201,11 @@ TEST(Serialization, UnsignedVarint) {
Meta_##Type_::Type value = Value; \
Meta_##Type_::Type result; \
\
- std::stringstream sb; \
+ TWritableBuf sb(nullptr, BUFFER_SIZE); \
TKafkaWritable writable(sb); \
+ TKafkaReadable readable(sb.GetBuffer()); \
\
+ Y_UNUSED(readable); \
Y_UNUSED(result); \
\
NKafka::NPrivate::TWriteCollector collector;
@@ -311,27 +237,41 @@ TEST(Serialization, TKafkaInt8_NotPresentVersion) {
SIMPLE_HEAD(TKafkaInt8, 37);
NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 0, value);
- sb.get();
- EXPECT_TRUE(sb.eof()); // For version 0 value is not serializable. Stream must be empty
+ EXPECT_EQ(sb.Size(), (size_t)0); // For version 0 value is not serializable. Stream must be empty
EXPECT_EQ(collector.NumTaggedFields, 0u);
- TFieldReadProcessor<Meta_TKafkaInt8> processor(sb);
- processor.Read(value, 0);
-
- //EXPECT_EQ(result, Meta_TKafkaInt8::Default); // For version 0 value is not serializable
+ NKafka::NPrivate::Read<Meta_TKafkaInt8>(readable, 0, result);
+ EXPECT_EQ(result, Meta_TKafkaInt8::Default); // For version 0 value is not serializable
}
TEST(Serialization, TKafkaInt8_PresentVersion_NotTaggedVersion) {
SIMPLE_HEAD(TKafkaInt8, 37);
NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 3, value);
- TFieldReadProcessor<Meta_TKafkaInt8> processor(sb);
- processor.Read(result, 3);
+ NKafka::NPrivate::Read<Meta_TKafkaInt8>(readable, 3, result);
EXPECT_EQ(collector.NumTaggedFields, 0u);
EXPECT_EQ(result, value); // Must read same that write
}
+TEST(Serialization, TKafkaInt8_PresentVersion_TaggedVersion) {
+ SIMPLE_HEAD(TKafkaInt8, 37);
+
+ NKafka::NPrivate::Write<Meta_TKafkaInt8>(collector, writable, 11, value);
+ EXPECT_EQ(collector.NumTaggedFields, 1u);
+
+ NKafka::NPrivate::WriteTag<Meta_TKafkaInt8>(writable, 11, value);
+
+ i32 tag = readable.readUnsignedVarint();
+ EXPECT_EQ(tag, Meta_TKafkaInt8::Tag);
+
+ ui32 size = readable.readUnsignedVarint();
+ EXPECT_EQ(size, sizeof(TKafkaInt8));
+
+ NKafka::NPrivate::ReadTag<Meta_TKafkaInt8>(readable, 11, result);
+ EXPECT_EQ(result, value); // Must read same that write
+}
+
TEST(Serialization, TKafkaInt8_PresentVersion_TaggedVersion_Default) {
SIMPLE_HEAD(TKafkaInt8, Meta_TKafkaInt8::Default);
@@ -402,13 +342,30 @@ TEST(Serialization, TKafkaString_PresentVersion_NotTaggedVersion) {
SIMPLE_HEAD(TKafkaString, { "some value" });
NKafka::NPrivate::Write<Meta_TKafkaString>(collector, writable, 3, value);
- TFieldReadProcessor<Meta_TKafkaString> processor(sb);
- processor.Read(result, 3);
+ NKafka::NPrivate::Read<Meta_TKafkaString>(readable, 3, result);
EXPECT_EQ(collector.NumTaggedFields, 0u);
EXPECT_EQ(result, value); // Must read same that write
}
+TEST(Serialization, TKafkaString_PresentVersion_TaggedVersion) {
+ SIMPLE_HEAD(TKafkaString, { "some value" });
+
+ NKafka::NPrivate::Write<Meta_TKafkaString>(collector, writable, 11, value);
+ EXPECT_EQ(collector.NumTaggedFields, 1u);
+
+ NKafka::NPrivate::WriteTag<Meta_TKafkaString>(writable, 11, value);
+
+ i32 tag = readable.readUnsignedVarint();
+ EXPECT_EQ(tag, Meta_TKafkaString::Tag);
+
+ ui32 size = readable.readUnsignedVarint();
+ EXPECT_EQ(size, value->size() + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1)); // "+1" because serialized as unsigned int, and null serialized with size equals 0
+
+ NKafka::NPrivate::ReadTag<Meta_TKafkaString>(readable, 11, result);
+ EXPECT_EQ(result, value); // Must read same that write
+}
+
TEST(Serialization, TKafkaString_PresentVersion_TaggedVersion_Default) {
SIMPLE_HEAD(TKafkaInt8, Meta_TKafkaInt8::Default);
@@ -449,13 +406,34 @@ TEST(Serialization, TKafkaArray_PresentVersion_NotTaggedVersion) {
SIMPLE_HEAD(TKafkaArray, { "some value" });
NKafka::NPrivate::Write<Meta_TKafkaArray>(collector, writable, 3, value);
- TFieldReadProcessor<Meta_TKafkaArray> processor(sb);
- processor.Read(result, 3);
+ NKafka::NPrivate::Read<Meta_TKafkaArray>(readable, 3, result);
EXPECT_EQ(collector.NumTaggedFields, 0u);
EXPECT_EQ(result, value); // Must read same that write
}
+TEST(Serialization, TKafkaArray_PresentVersion_TaggedVersion) {
+ TString v = "some value";
+ SIMPLE_HEAD(TKafkaArray, { v });
+
+ NKafka::NPrivate::Write<Meta_TKafkaArray>(collector, writable, 11, value);
+ EXPECT_EQ(collector.NumTaggedFields, 1u);
+
+ NKafka::NPrivate::WriteTag<Meta_TKafkaArray>(writable, 11, value);
+
+ i32 tag = readable.readUnsignedVarint();
+ EXPECT_EQ(tag, Meta_TKafkaArray::Tag);
+
+ ui32 size = readable.readUnsignedVarint();
+ EXPECT_EQ(size, v.length() // array element data
+ + NKafka::NPrivate::SizeOfUnsignedVarint(value.size()) // array size
+ + NKafka::NPrivate::SizeOfUnsignedVarint(v.length() + 1) // string size. +1 because null string serialize as 0-length
+ );
+
+ NKafka::NPrivate::ReadTag<Meta_TKafkaArray>(readable, 11, result);
+ EXPECT_EQ(result, value); // Must read same that write
+}
+
TEST(Serialization, TKafkaArray_PresentVersion_TaggedVersion_Default) {
SIMPLE_HEAD(TKafkaArray, {});
@@ -487,20 +465,43 @@ TEST(Serialization, TKafkaBytes_IsDefault) {
Meta_TKafkaBytes::Type value;
EXPECT_TRUE(NKafka::NPrivate::IsDefaultValue<Meta_TKafkaBytes>(value)); // value is std::nullopt
- value = TBuffer();
- value->Resize(10);
+ char v[] = "value";
+ value = TArrayRef<char>(v);
EXPECT_FALSE(NKafka::NPrivate::IsDefaultValue<Meta_TKafkaBytes>(value)); // value is not null
}
TEST(Serialization, TKafkaBytes_PresentVersion_NotTaggedVersion) {
- SIMPLE_HEAD(TKafkaBytes, TBuffer("0123456789", 10));
+ char v[] = "0123456789";
+ SIMPLE_HEAD(TKafkaBytes, TArrayRef(v));
NKafka::NPrivate::Write<Meta_TKafkaBytes>(collector, writable, 3, value);
- TFieldReadProcessor<Meta_TKafkaBytes> processor(sb);
- processor.Read(result, 3);
+ NKafka::NPrivate::Read<Meta_TKafkaBytes>(readable, 3, result);
EXPECT_EQ(collector.NumTaggedFields, 0u);
- EXPECT_EQ(result, value); // Must read same that write
+ EXPECT_EQ(result->size(), value->size());
+ EXPECT_STREQ(result->begin(), value->begin()); // Must read same that write
+}
+
+TEST(Serialization, TKafkaBytes_PresentVersion_TaggedVersion) {
+ char v[] = "0123456789";
+ SIMPLE_HEAD(TKafkaBytes, TArrayRef(v));
+
+ NKafka::NPrivate::Write<Meta_TKafkaBytes>(collector, writable, 11, value);
+ EXPECT_EQ(collector.NumTaggedFields, 1u);
+
+ NKafka::NPrivate::WriteTag<Meta_TKafkaBytes>(writable, 11, value);
+
+ i32 tag = readable.readUnsignedVarint();
+ EXPECT_EQ(tag, Meta_TKafkaArray::Tag);
+
+ ui32 size = readable.readUnsignedVarint();
+ EXPECT_EQ(size, value->size() // byffer data
+ + NKafka::NPrivate::SizeOfUnsignedVarint(value->size() + 1) // buffer size. +1 because null value stored as size 0
+ );
+
+ NKafka::NPrivate::ReadTag<Meta_TKafkaBytes>(readable, 11, result);
+ EXPECT_EQ(result->size(), value->size());
+ EXPECT_STREQ(result->begin(), value->begin()); // Must read same that write
}
TEST(Serialization, TKafkaBytes_PresentVersion_TaggedVersion_Default) {
@@ -516,8 +517,9 @@ TEST(Serialization, TRequestHeaderData_reference) {
ui8 reference[] = {0x00, 0x03, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0D, 0x00, 0x10, 0x63, 0x6C, 0x69, 0x65, 0x6E, 0x74,
0x2D, 0x69, 0x64, 0x2D, 0x73, 0x74, 0x72, 0x69, 0x6E, 0x67, 0x00};
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TKafkaWritable writable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TRequestHeaderData value;
value.RequestApiKey = 3;
@@ -527,16 +529,14 @@ TEST(Serialization, TRequestHeaderData_reference) {
value.Write(writable, 2);
- for(ui8 r : reference) {
- ui8 v = sb.get();
- EXPECT_EQ(v, r);
+ EXPECT_EQ(sb.Size(), sizeof(reference));
+ for(size_t i = 0; i < sizeof(reference); ++i) {
+ EXPECT_EQ(*(sb.Data() + i), reference[i]);
}
- sb.write((char*)reference, sizeof(reference));
- TReadProcessor processor(sb);
TRequestHeaderData result;
- processor.Read(&result, 2);
+ result.Read(readable, 2);
EXPECT_EQ(result.RequestApiKey, 3);
EXPECT_EQ(result.RequestApiVersion, 7);
@@ -572,16 +572,14 @@ TEST(Serialization, TKafkaFloat64_PresentVersion_NotTaggedVersion) {
SIMPLE_HEAD(TKafkaFloat64, 3.1415);
NKafka::NPrivate::Write<Meta_TKafkaFloat64>(collector, writable, 3, value);
- TFieldReadProcessor<Meta_TKafkaFloat64> processor(sb);
- processor.Read(result, 3);
+ NKafka::NPrivate::Read<Meta_TKafkaFloat64>(readable, 3, result);
EXPECT_EQ(collector.NumTaggedFields, 0u);
EXPECT_EQ(result, value); // Must read same that write
- NKafka::NPrivate::Write<Meta_TKafkaFloat64>(collector, writable, 3, value);
- for(ui8 r : reference) {
- ui8 v = sb.get();
- EXPECT_EQ(v, r);
+ EXPECT_EQ(sb.Size(), sizeof(reference));
+ for(size_t i = 0; i < sizeof(reference); ++i) {
+ EXPECT_EQ(*(sb.Data() + i), (char)reference[i]);
}
}
@@ -589,12 +587,12 @@ TEST(Serialization, RequestHeader_reference) {
ui8 reference[] = {0x00, 0x12, 0x00, 0x00, 0x7F, 0x6F, 0x6F, 0x68, 0x00, 0x0A, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63,
0x65, 0x72, 0x2D, 0x31};
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
sb.write((char*)reference, sizeof(reference));
- TReadProcessor processor(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TRequestHeaderData result;
- processor.Read(&result, 1);
+ result.Read(readable, 1);
EXPECT_EQ(result.RequestApiKey, 0x12);
EXPECT_EQ(result.RequestApiVersion, 0x00);
@@ -611,22 +609,25 @@ TEST(Serialization, ProduceRequestData_reference) {
0x6B, 0x61, 0x2D, 0x62, 0x79, 0x74, 0x65, 0x73, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0D, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6F, 0x6E, 0x2D, 0x32, 0x33, 0x01, 0x00, 0x00};
- std::stringstream sb;
+ TWritableBuf sb(nullptr, BUFFER_SIZE);
TKafkaWritable writable(sb);
- TKafkaReadable readable(sb);
+ TKafkaReadable readable(sb.GetBuffer());
TProduceRequestData value;
value.Acks = 3;
value.TimeoutMs = 5;
value.TransactionalId = "7";
+ char record0[] = "record-13-it-is-kafka-bytes";
+ char record1[] = "record-17-it-is-kafka-bytes";
+
value.TopicData.resize(2);
value.TopicData[0].Name = "partition-11";
value.TopicData[0].PartitionData.resize(3);
value.TopicData[0].PartitionData[0].Index = 13;
- value.TopicData[0].PartitionData[0].Records = TKafkaRawBytes("record-13-it-is-kafka-bytes", 27);
+ value.TopicData[0].PartitionData[0].Records = TKafkaRawBytes(record0, 27);
value.TopicData[0].PartitionData[1].Index = 17;
- value.TopicData[0].PartitionData[1].Records = TKafkaRawBytes("record-17-it-is-kafka-bytes", 27);
+ value.TopicData[0].PartitionData[1].Records = TKafkaRawBytes(record1, 27);
value.TopicData[1].Name = "partition-23";
@@ -634,16 +635,8 @@ TEST(Serialization, ProduceRequestData_reference) {
// Print(sb);
- for(ui8 r : reference) {
- ui8 v = sb.get();
- EXPECT_EQ(v, r);
- }
-
- sb.write((char*)reference, sizeof(reference));
-
- TReadProcessor processor(sb);
TProduceRequestData result;
- processor.Read(&result, 9);
+ result.Read(readable, 9);
EXPECT_EQ(result.Acks, 3);
EXPECT_EQ(result.TimeoutMs, 5);
@@ -653,14 +646,19 @@ TEST(Serialization, ProduceRequestData_reference) {
EXPECT_EQ(result.TopicData[0].Name, "partition-11");
EXPECT_EQ(result.TopicData[0].PartitionData.size(), 3ul);
EXPECT_EQ(result.TopicData[0].PartitionData[0].Index, 13);
- EXPECT_EQ(result.TopicData[0].PartitionData[0].Records, TKafkaRawBytes("record-13-it-is-kafka-bytes", 27));
+ EXPECT_EQ(result.TopicData[0].PartitionData[0].Records, TKafkaRawBytes(record0, 27));
EXPECT_EQ(result.TopicData[0].PartitionData[1].Index, 17);
- EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, TKafkaRawBytes("record-17-it-is-kafka-bytes", 27));
+ EXPECT_EQ(result.TopicData[0].PartitionData[1].Records, TKafkaRawBytes(record1, 27));
EXPECT_EQ(result.TopicData[0].PartitionData[2].Index, 0);
EXPECT_EQ(result.TopicData[0].PartitionData[2].Records, std::nullopt);
EXPECT_EQ(result.TopicData[1].Name, "partition-23");
EXPECT_EQ(result.TopicData[1].PartitionData.size(), 0ul);
+
+ EXPECT_EQ(sb.Size(), sizeof(reference));
+ for(size_t i = 0; i < sizeof(reference); ++i) {
+ EXPECT_EQ(*(sb.Data() + i), (char)reference[i]);
+ }
}
char Hex(const unsigned char c) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3c3fa68ae5..cbe8673356 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1795,6 +1795,10 @@ message TKafkaProxyConfig {
optional int32 ListeningPort = 2 [default = 9092];
optional string SslCertificate = 3;
+
+ optional uint64 MaxMessageSize = 4 [default = 268435456];
+ optional uint64 MaxInflightSize = 5 [default = 268435456];
+ optional uint64 PacketSize = 6 [default = 1500];
}
message TAwsCompatibilityConfig {
diff --git a/ydb/core/raw_socket/sock_impl.h b/ydb/core/raw_socket/sock_impl.h
index 632acaa1cf..5389f0575b 100644
--- a/ydb/core/raw_socket/sock_impl.h
+++ b/ydb/core/raw_socket/sock_impl.h
@@ -98,4 +98,51 @@ public:
}
};
+class TBufferedWriter {
+public:
+ TBufferedWriter(TSocketDescriptor* socket, size_t size)
+ : Socket(socket)
+ , Buffer(size) {
+ }
+
+ void write(const char* src, size_t length) {
+ size_t possible = std::min(length, Buffer.Avail());
+ if (possible > 0) {
+ Buffer.Append(src, possible);
+ }
+ if (0 == Buffer.Avail()) {
+ flush();
+ }
+ size_t left = length - possible;
+ if (left > Buffer.Size()) {
+ Socket->Send(src + possible, left);
+ } else if (left > 0) {
+ Buffer.Append(src + possible, left);
+ }
+ }
+
+ void flush() {
+ if (Buffer.Size() > 0) {
+ Socket->Send(Buffer.Data(), Buffer.Size());
+ Buffer.Clear();
+ }
+ }
+
+ const char* Data() {
+ return Buffer.Data();
+ }
+
+ const TBuffer& GetBuffer() {
+ return Buffer;
+ }
+
+ size_t Size() {
+ return Buffer.Size();
+ }
+
+private:
+ TSocketDescriptor* Socket;
+ TBuffer Buffer;
+};
+
} // namespace NKikimr::NRawSocket