aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-01-02 15:12:46 +0300
committerbabenko <babenko@yandex-team.com>2025-01-02 15:29:58 +0300
commit272bf996ca8be01fd348e8b858097954a95e45a6 (patch)
tree1429e852ac86016a3ba5c966d084a349c7980e30
parent12bc174eaf16a035b7d1e950e84ada736e962cbe (diff)
downloadydb-272bf996ca8be01fd348e8b858097954a95e45a6.tar.gz
RIP zookeeper support, hope to see you again soon
commit_hash:ab647f4f0ce8f7ce4bcdaa11eaf3ed0dffec8d09
-rw-r--r--yt/yt/client/object_client/public.h2
-rw-r--r--yt/yt/client/unittests/ya.make2
-rw-r--r--yt/yt/client/unittests/zookeeper_bus_ut.cpp165
-rw-r--r--yt/yt/client/unittests/zookeeper_protocol_ut.cpp106
-rw-r--r--yt/yt/client/ya.make4
-rw-r--r--yt/yt/client/zookeeper/packet.cpp294
-rw-r--r--yt/yt/client/zookeeper/packet.h15
-rw-r--r--yt/yt/client/zookeeper/protocol.cpp204
-rw-r--r--yt/yt/client/zookeeper/protocol.h59
-rw-r--r--yt/yt/client/zookeeper/public.h19
-rw-r--r--yt/yt/client/zookeeper/requests.cpp28
-rw-r--r--yt/yt/client/zookeeper/requests.h42
12 files changed, 1 insertions, 939 deletions
diff --git a/yt/yt/client/object_client/public.h b/yt/yt/client/object_client/public.h
index 5937956d08..7cf79d3b1f 100644
--- a/yt/yt/client/object_client/public.h
+++ b/yt/yt/client/object_client/public.h
@@ -344,7 +344,7 @@ DEFINE_ENUM(EObjectType,
((ClusterProxyNode) (1500))
// Zookeeper stuff
- ((ZookeeperShard) (1400))
+ // COMPAT(babenko): drop completely
((ZookeeperShardMap) (1401))
// Flow stuff
diff --git a/yt/yt/client/unittests/ya.make b/yt/yt/client/unittests/ya.make
index a71a98db2c..bbde30040b 100644
--- a/yt/yt/client/unittests/ya.make
+++ b/yt/yt/client/unittests/ya.make
@@ -34,8 +34,6 @@ SRCS(
validate_logical_type_ut.cpp
wire_protocol_ut.cpp
ypath_ut.cpp
- zookeeper_bus_ut.cpp
- zookeeper_protocol_ut.cpp
)
INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc)
diff --git a/yt/yt/client/unittests/zookeeper_bus_ut.cpp b/yt/yt/client/unittests/zookeeper_bus_ut.cpp
deleted file mode 100644
index a3ecdc414d..0000000000
--- a/yt/yt/client/unittests/zookeeper_bus_ut.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-#include <yt/yt/core/test_framework/framework.h>
-
-#include <yt/yt/client/zookeeper/packet.h>
-
-#include <yt/yt/core/bus/bus.h>
-#include <yt/yt/core/bus/client.h>
-#include <yt/yt/core/bus/server.h>
-
-#include <yt/yt/core/bus/tcp/config.h>
-#include <yt/yt/core/bus/tcp/client.h>
-#include <yt/yt/core/bus/tcp/server.h>
-
-#include <yt/yt/core/net/socket.h>
-
-#include <library/cpp/testing/common/network.h>
-
-#include <library/cpp/yt/threading/event_count.h>
-
-namespace NYT::NZookeeper {
-namespace {
-
-using namespace NBus;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TSharedRefArray CreateMessage(int size)
-{
- auto data = TSharedMutableRef::Allocate(size);
- return TSharedRefArray(TSharedRef(data));
-}
-
-TSharedRefArray Serialize(TString message)
-{
- return TSharedRefArray(TSharedRef::FromString(message));
-}
-
-TString Deserialize(TSharedRefArray message)
-{
- YT_ASSERT(message.Size() == 1);
- const auto& part = message[0];
- return TString(part.Begin(), part.Size());
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TReplyingBusHandler
- : public IMessageHandler
-{
-public:
- TReplyingBusHandler(TString message)
- : Message_(std::move(message))
- { }
-
- void HandleMessage(
- TSharedRefArray message,
- IBusPtr replyBus) noexcept override
- {
- EXPECT_EQ(1, std::ssize(message));
- auto replyMessage = Serialize(Message_);
- YT_UNUSED_FUTURE(replyBus->Send(replyMessage));
- }
-
-private:
- const TString Message_;
-};
-
-class TCheckingBusHandler
- : public IMessageHandler
-{
-public:
- explicit TCheckingBusHandler(int numRepliesWaiting, TString message)
- : Message_(std::move(message))
- , NumRepliesWaiting(numRepliesWaiting)
- { }
-
- void WaitUntilDone()
- {
- Event_.Wait();
- }
-
-private:
- const TString Message_;
-
- std::atomic<int> NumRepliesWaiting;
- NThreading::TEvent Event_;
-
- void HandleMessage(
- TSharedRefArray message,
- IBusPtr /*replyBus*/) noexcept override
- {
- auto value = Deserialize(message);
- EXPECT_EQ(Message_, value);
-
- if (--NumRepliesWaiting == 0) {
- Event_.NotifyAll();
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TZookeeperBusTest
- : public testing::Test
-{
-public:
- NTesting::TPortHolder Port;
- TString Address;
-
- TZookeeperBusTest()
- {
- Port = NTesting::GetFreePort();
- Address = Format("localhost:%v", Port);
- }
-
- IBusServerPtr StartBusServer(IMessageHandlerPtr handler)
- {
- auto config = TBusServerConfig::CreateTcp(Port);
- auto server = CreateBusServer(
- config,
- GetZookeeperPacketTranscoderFactory());
- server->Start(handler);
- return server;
- }
-
- void TestReplies(int numRequests, const TString& message)
- {
- auto server = StartBusServer(New<TReplyingBusHandler>(message));
- auto client = CreateBusClient(
- TBusClientConfig::CreateTcp(Address),
- GetZookeeperPacketTranscoderFactory());
- auto handler = New<TCheckingBusHandler>(numRequests, message);
- auto bus = client->CreateBus(handler);
-
- std::vector<TFuture<void>> results;
- for (int i = 0; i < numRequests; ++i) {
- if (auto result = bus->Send(CreateMessage(10))) {
- results.push_back(result);
- }
- }
-
- for (const auto& result : results) {
- auto error = result.Get();
- EXPECT_TRUE(error.IsOK());
- }
-
- handler->WaitUntilDone();
-
- server->Stop()
- .Get()
- .ThrowOnError();
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-TEST_F(TZookeeperBusTest, Simple)
-{
- TestReplies(1, "42");
- TestReplies(100, "abacaba");
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/unittests/zookeeper_protocol_ut.cpp b/yt/yt/client/unittests/zookeeper_protocol_ut.cpp
deleted file mode 100644
index 03539fc3c0..0000000000
--- a/yt/yt/client/unittests/zookeeper_protocol_ut.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-#include <yt/yt/core/test_framework/framework.h>
-
-#include <yt/yt/client/zookeeper/protocol.h>
-
-namespace NYT::NZookeeper {
-namespace {
-
-////////////////////////////////////////////////////////////////////////////////
-
-TEST(TZookeeperProtocolTest, Simple)
-{
- const TString LongString{100'000, 'a'};
-
- auto writer = CreateZookeeperProtocolWriter();
- writer->WriteByte('$');
- writer->WriteInt(42);
- writer->WriteInt(std::numeric_limits<int>::min());
- writer->WriteInt(std::numeric_limits<int>::max());
- writer->WriteLong(123);
- writer->WriteLong(std::numeric_limits<i64>::min());
- writer->WriteLong(std::numeric_limits<i64>::max());
- writer->WriteBool(false);
- writer->WriteBool(true);
- writer->WriteString("abacaba");
- writer->WriteString(LongString);
- writer->WriteString("");
-
- auto data = writer->Finish();
- auto reader = CreateZookeeperProtocolReader(std::move(data));
- EXPECT_EQ('$', reader->ReadByte());
- EXPECT_EQ(42, reader->ReadInt());
- EXPECT_EQ(std::numeric_limits<int>::min(), reader->ReadInt());
- EXPECT_EQ(std::numeric_limits<int>::max(), reader->ReadInt());
- EXPECT_EQ(123, reader->ReadLong());
- EXPECT_EQ(std::numeric_limits<i64>::min(), reader->ReadLong());
- EXPECT_EQ(std::numeric_limits<i64>::max(), reader->ReadLong());
- EXPECT_EQ(false, reader->ReadBool());
- EXPECT_EQ(true, reader->ReadBool());
- EXPECT_EQ("abacaba", reader->ReadString());
- EXPECT_EQ(LongString, reader->ReadString());
-
- EXPECT_FALSE(reader->IsFinished());
- EXPECT_THROW_WITH_SUBSTRING(reader->ValidateFinished(), "Expected end of stream");
-
- TString longString;
- longString.resize(1'000'000);
- reader->ReadString(&longString);
- EXPECT_EQ(LongString, LongString);
-
- EXPECT_TRUE(reader->IsFinished());
- reader->ValidateFinished();
-}
-
-TEST(TZookeeperProtocolTest, WriterReallocation)
-{
- constexpr int Count = 954023;
-
- auto writer = CreateZookeeperProtocolWriter();
- for (int i = 0; i < Count; ++i) {
- writer->WriteLong(i);
- }
-
- auto data = writer->Finish();
- EXPECT_EQ(8u * Count, data.Size());
-
- auto reader = CreateZookeeperProtocolReader(std::move(data));
- for (int i = 0; i < Count; ++i) {
- EXPECT_EQ(i, reader->ReadLong());
- }
- reader->ValidateFinished();
-}
-
-TEST(TZookeeperProtocolTest, CorruptedInput)
-{
- {
- auto reader = CreateZookeeperProtocolReader(TSharedRef::MakeEmpty());
- EXPECT_THROW_WITH_SUBSTRING(reader->ReadInt(), "Premature end of stream");
- EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream");
- }
- {
- auto writer = CreateZookeeperProtocolWriter();
- writer->WriteString(TString{100'000, 'a'});
- auto data = writer->Finish();
- data = data.Slice(0, 12345);
- auto reader = CreateZookeeperProtocolReader(std::move(data));
- EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream");
- }
- {
- auto writer = CreateZookeeperProtocolWriter();
- writer->WriteLong(1234);
- writer->WriteLong(std::numeric_limits<i64>::max());
- writer->WriteLong('a');
- writer->WriteLong('b');
-
- auto data = writer->Finish();
- auto reader = CreateZookeeperProtocolReader(std::move(data));
- EXPECT_EQ(1234, reader->ReadLong());
- // NB: Offset + Size may overflow here if implemented carelessly.
- EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream");
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index cc55baf1a4..f183fca520 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -193,10 +193,6 @@ SRCS(
complex_types/uuid_text.cpp
complex_types/yson_format_conversion.cpp
- zookeeper/packet.cpp
- zookeeper/protocol.cpp
- zookeeper/requests.cpp
-
kafka/packet.cpp
kafka/protocol.cpp
kafka/requests.cpp
diff --git a/yt/yt/client/zookeeper/packet.cpp b/yt/yt/client/zookeeper/packet.cpp
deleted file mode 100644
index d1dad9427c..0000000000
--- a/yt/yt/client/zookeeper/packet.cpp
+++ /dev/null
@@ -1,294 +0,0 @@
-#include "packet.h"
-
-#include <library/cpp/yt/memory/chunked_memory_allocator.h>
-
-namespace NYT::NZookeeper {
-
-using namespace NBus;
-
-////////////////////////////////////////////////////////////////////////////////
-
-DEFINE_ENUM(EPacketPhase,
- (Header)
- (Message)
- (Finished)
-);
-
-struct TPacketDecoderTag
-{ };
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TPacketTranscoderBase
-{
- union {
- int MessageSize;
- char Data[sizeof(int)];
- } Header_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPacketDecoder
- : public IPacketDecoder
- , public TPacketTranscoderBase
-{
-public:
- TPacketDecoder()
- : Allocator_(
- PacketDecoderChunkSize,
- TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio,
- GetRefCountedTypeCookie<TPacketDecoderTag>())
- {
- Restart();
- }
-
- void Restart() override
- {
- PacketSize_ = 0;
- Message_.Reset();
-
- BeginPhase(
- EPacketPhase::Header,
- Header_.Data,
- sizeof(Header_.Data));
- }
-
- bool IsInProgress() const override
- {
- return !IsFinished();
- }
-
- bool IsFinished() const override
- {
- return Phase_ == EPacketPhase::Finished;
- }
-
- TMutableRef GetFragment() override
- {
- return TMutableRef(FragmentPtr_, FragmentRemaining_);
- }
-
- bool Advance(size_t size) override
- {
- YT_ASSERT(FragmentRemaining_ != 0);
- YT_ASSERT(size <= FragmentRemaining_);
-
- PacketSize_ += size;
- FragmentRemaining_ -= size;
- FragmentPtr_ += size;
- if (FragmentRemaining_ == 0) {
- return EndPhase();
- } else {
- return true;
- }
- }
-
- EPacketType GetPacketType() const override
- {
- return EPacketType::Message;
- }
-
- EPacketFlags GetPacketFlags() const override
- {
- return EPacketFlags::None;
- }
-
- TPacketId GetPacketId() const override
- {
- return {};
- }
-
- size_t GetPacketSize() const override
- {
- return PacketSize_;
- }
-
- TSharedRefArray GrabMessage() const override
- {
- return TSharedRefArray(Message_);
- }
-
-private:
- EPacketPhase Phase_ = EPacketPhase::Finished;
- char* FragmentPtr_ = nullptr;
- size_t FragmentRemaining_ = 0;
-
- TChunkedMemoryAllocator Allocator_;
- static constexpr i64 PacketDecoderChunkSize = 16_KB;
-
- TSharedRef Message_;
-
- size_t PacketSize_ = 0;
-
- void BeginPhase(
- EPacketPhase phase,
- char* fragmentPtr,
- size_t fragmentSize)
- {
- Phase_ = phase;
- FragmentPtr_ = fragmentPtr;
- FragmentRemaining_ = fragmentSize;
- }
-
- bool EndPhase()
- {
- switch (Phase_) {
- case EPacketPhase::Header: {
- std::reverse(std::begin(Header_.Data), std::end(Header_.Data));
- auto messageSize = Header_.MessageSize;
- auto message = Allocator_.AllocateAligned(messageSize);
- BeginPhase(
- EPacketPhase::Message,
- /*fragmentPtr*/ message.begin(),
- /*fragmentSize*/ message.size());
- Message_ = std::move(message);
-
- return true;
- }
- case EPacketPhase::Message:
- BeginPhase(
- EPacketPhase::Finished,
- /*fragmentPtr*/ nullptr,
- /*fragmentSize*/ 0);
-
- return true;
- default:
- YT_ABORT();
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPacketEncoder
- : public IPacketEncoder
- , public TPacketTranscoderBase
-{
-public:
- size_t GetPacketSize(
- EPacketType type,
- const TSharedRefArray& /*message*/,
- size_t messageSize) override
- {
- YT_ASSERT(type == EPacketType::Message);
- YT_ASSERT(messageSize > 0);
-
- return sizeof(Header_) + messageSize;
- }
-
- bool Start(
- EPacketType type,
- EPacketFlags flags,
- bool /*generateChecksums*/,
- int /*checksummedPartCount*/,
- TPacketId /*packetId*/,
- TSharedRefArray messageParts) override
- {
- YT_ASSERT(type == EPacketType::Message);
- YT_ASSERT(flags == EPacketFlags::None);
- YT_ASSERT(!messageParts.Empty());
-
- i64 messageSize = 0;
- for (const auto& messagePart : messageParts) {
- messageSize += messagePart.size();
- }
- Header_.MessageSize = messageSize;
- std::reverse(std::begin(Header_.Data), std::end(Header_.Data));
-
- MessageParts_ = std::move(messageParts);
-
- Phase_ = EPacketPhase::Header;
- CurrentMessagePart_ = 0;
-
- return true;
- }
-
- TMutableRef GetFragment() override
- {
- switch (Phase_) {
- case EPacketPhase::Header:
- return TMutableRef(Header_.Data, sizeof(Header_.Data));
- case EPacketPhase::Message: {
- const auto& messagePart = MessageParts_[CurrentMessagePart_];
- return TMutableRef(
- const_cast<char*>(messagePart.begin()),
- messagePart.size());
- }
- default:
- YT_ABORT();
- }
- }
-
- bool IsFragmentOwned() const override
- {
- switch (Phase_) {
- case EPacketPhase::Header:
- return false;
- case EPacketPhase::Message:
- return true;
- default:
- YT_ABORT();
- }
- }
-
- void NextFragment() override
- {
- switch (Phase_) {
- case EPacketPhase::Header:
- Phase_ = EPacketPhase::Message;
- CurrentMessagePart_ = 0;
- break;
- case EPacketPhase::Message:
- if (CurrentMessagePart_ + 1 < MessageParts_.size()) {
- ++CurrentMessagePart_;
- } else {
- Phase_ = EPacketPhase::Finished;
- }
- break;
- default:
- YT_ABORT();
- }
- }
-
- bool IsFinished() const override
- {
- return Phase_ == EPacketPhase::Finished;
- }
-
-private:
- EPacketPhase Phase_ = EPacketPhase::Finished;
-
- TSharedRefArray MessageParts_;
- size_t CurrentMessagePart_ = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TPacketTranscoderFactory
- : public IPacketTranscoderFactory
-{
- std::unique_ptr<IPacketDecoder> CreateDecoder(
- const NLogging::TLogger& /*logger*/,
- bool /*verifyChecksum*/) const override
- {
- return std::make_unique<TPacketDecoder>();
- }
-
- std::unique_ptr<IPacketEncoder> CreateEncoder(
- const NLogging::TLogger& /*logger*/) const override
- {
- return std::make_unique<TPacketEncoder>();
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-IPacketTranscoderFactory* GetZookeeperPacketTranscoderFactory()
-{
- return LeakySingleton<TPacketTranscoderFactory>();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/zookeeper/packet.h b/yt/yt/client/zookeeper/packet.h
deleted file mode 100644
index dd7eb806e1..0000000000
--- a/yt/yt/client/zookeeper/packet.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/bus/tcp/packet.h>
-
-namespace NYT::NZookeeper {
-
-////////////////////////////////////////////////////////////////////////////////
-
-NBus::IPacketTranscoderFactory* GetZookeeperPacketTranscoderFactory();
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/zookeeper/protocol.cpp b/yt/yt/client/zookeeper/protocol.cpp
deleted file mode 100644
index 857f7b4ddd..0000000000
--- a/yt/yt/client/zookeeper/protocol.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-#include "protocol.h"
-
-#include <yt/yt/core/misc/error.h>
-
-namespace NYT::NZookeeper {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TZookeeperProtocolReader
- : public IZookeeperProtocolReader
-{
-public:
- explicit TZookeeperProtocolReader(TSharedRef data)
- : Data_(std::move(data))
- { }
-
- char ReadByte() override
- {
- return DoReadInt<char>();
- }
-
- int ReadInt() override
- {
- return DoReadInt<int>();
- }
-
- i64 ReadLong() override
- {
- return DoReadInt<i64>();
- }
-
- bool ReadBool() override
- {
- auto value = ReadByte();
- return value > 0;
- }
-
- TString ReadString() override
- {
- TString result;
- ReadString(&result);
-
- return result;
- }
-
- void ReadString(TString* result) override
- {
- auto length = ReadInt();
- ValidateSizeAvailable(length);
-
- result->resize(length);
- auto begin = Data_.begin() + Offset_;
- std::copy(begin, begin + length, result->begin());
- Offset_ += length;
- }
-
- TSharedRef GetSuffix() const override
- {
- return Data_.Slice(Offset_, Data_.Size());
- }
-
- bool IsFinished() const override
- {
- return Offset_ == std::ssize(Data_);
- }
-
- void ValidateFinished() const override
- {
- if (!IsFinished()) {
- THROW_ERROR_EXCEPTION("Expected end of stream")
- << TErrorAttribute("offset", Offset_)
- << TErrorAttribute("message_size", Data_.size());
- }
- }
-
-private:
- const TSharedRef Data_;
- i64 Offset_ = 0;
-
- template <typename T>
- T DoReadInt()
- {
- ValidateSizeAvailable(sizeof(T));
-
- union {
- T value;
- char bytes[sizeof(T)];
- } result;
-
- memcpy(result.bytes, Data_.begin() + Offset_, sizeof(T));
- std::reverse(result.bytes, result.bytes + sizeof(T));
- Offset_ += sizeof(T);
-
- return result.value;
- }
-
- void ValidateSizeAvailable(i64 size)
- {
- if (std::ssize(Data_) - Offset_ < size) {
- THROW_ERROR_EXCEPTION("Premature end of stream while reading %v bytes", size);
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<IZookeeperProtocolReader> CreateZookeeperProtocolReader(TSharedRef data)
-{
- return std::make_unique<TZookeeperProtocolReader>(std::move(data));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TZookeeperProtocolWriter
- : public IZookeeperProtocolWriter
-{
-public:
- TZookeeperProtocolWriter()
- : Buffer_(AllocateBuffer(InitialBufferSize))
- { }
-
- void WriteByte(char value) override
- {
- DoWriteInt(value);
- }
-
- void WriteInt(int value) override
- {
- DoWriteInt(value);
- }
-
- void WriteLong(i64 value) override
- {
- DoWriteInt(value);
- }
-
- void WriteBool(bool value) override
- {
- WriteByte(value ? 1 : 0);
- }
-
- void WriteString(const TString& value) override
- {
- WriteInt(value.size());
-
- EnsureFreeSpace(value.size());
-
- std::copy(value.begin(), value.end(), Buffer_.begin() + Size_);
- Size_ += value.size();
- }
-
- TSharedRef Finish() override
- {
- return Buffer_.Slice(0, Size_);
- }
-
-private:
- struct TZookeeperProtocolWriterTag
- { };
-
- static constexpr i64 InitialBufferSize = 16_KB;
- static constexpr i64 BufferSizeMultiplier = 2;
-
- TSharedMutableRef Buffer_;
- i64 Size_ = 0;
-
- template <typename T>
- void DoWriteInt(T value)
- {
- EnsureFreeSpace(sizeof(T));
-
- memcpy(Buffer_.begin() + Size_, &value, sizeof(T));
- std::reverse(Buffer_.begin() + Size_, Buffer_.begin() + Size_ + sizeof(T));
- Size_+= sizeof(T);
- }
-
- void EnsureFreeSpace(i64 size)
- {
- if (Size_ + size <= std::ssize(Buffer_)) {
- return;
- }
-
- auto newSize = std::max<i64>(Size_ + size, Buffer_.size() * BufferSizeMultiplier);
- auto newBuffer = AllocateBuffer(newSize);
- std::copy(Buffer_.begin(), Buffer_.begin() + Size_, newBuffer.begin());
- Buffer_ = std::move(newBuffer);
- }
-
- static TSharedMutableRef AllocateBuffer(i64 capacity)
- {
- return TSharedMutableRef::Allocate<TZookeeperProtocolWriterTag>(capacity);
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<IZookeeperProtocolWriter> CreateZookeeperProtocolWriter()
-{
- return std::make_unique<TZookeeperProtocolWriter>();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/zookeeper/protocol.h b/yt/yt/client/zookeeper/protocol.h
deleted file mode 100644
index 29fea1da57..0000000000
--- a/yt/yt/client/zookeeper/protocol.h
+++ /dev/null
@@ -1,59 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <library/cpp/yt/memory/ref.h>
-
-namespace NYT::NZookeeper {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IZookeeperProtocolReader
-{
- virtual ~IZookeeperProtocolReader() = default;
-
- virtual char ReadByte() = 0;
- virtual int ReadInt() = 0;
- virtual i64 ReadLong() = 0;
-
- virtual bool ReadBool() = 0;
-
- virtual TString ReadString() = 0;
- virtual void ReadString(TString* result) = 0;
-
- virtual TSharedRef GetSuffix() const = 0;
-
- //! Returns true if input is fully consumed and false otherwise.
- virtual bool IsFinished() const = 0;
- //! Throws an error if input is not fully consumed. Does nothing otherwise.
- virtual void ValidateFinished() const = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<IZookeeperProtocolReader> CreateZookeeperProtocolReader(TSharedRef data);
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IZookeeperProtocolWriter
-{
- virtual ~IZookeeperProtocolWriter() = default;
-
- virtual void WriteByte(char value) = 0;
- virtual void WriteInt(int value) = 0;
- virtual void WriteLong(i64 value) = 0;
-
- virtual void WriteBool(bool value) = 0;
-
- virtual void WriteString(const TString& value) = 0;
-
- virtual TSharedRef Finish() = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<IZookeeperProtocolWriter> CreateZookeeperProtocolWriter();
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/zookeeper/public.h b/yt/yt/client/zookeeper/public.h
deleted file mode 100644
index 7f3c7c6823..0000000000
--- a/yt/yt/client/zookeeper/public.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/misc/public.h>
-
-namespace NYT::NZookeeperClient {
-
-////////////////////////////////////////////////////////////////////////////////
-
-using TZookeeperPath = TString;
-
-////////////////////////////////////////////////////////////////////////////////
-
-using TSessionId = ui64;
-
-constexpr TSessionId NullSessionId = 0;
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeperClient
diff --git a/yt/yt/client/zookeeper/requests.cpp b/yt/yt/client/zookeeper/requests.cpp
deleted file mode 100644
index 26eafd1a03..0000000000
--- a/yt/yt/client/zookeeper/requests.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-#include "requests.h"
-
-namespace NYT::NZookeeper {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TReqStartSession::Deserialize(IZookeeperProtocolReader* reader)
-{
- ProtocolVersion = reader->ReadInt();
- LastZxidSeen = reader->ReadLong();
- Timeout = TDuration::MilliSeconds(reader->ReadInt());
- SessionId = reader->ReadLong();
- Password = reader->ReadString();
- ReadOnly = reader->ReadBool();
-}
-
-void TRspStartSession::Serialize(IZookeeperProtocolWriter* writer) const
-{
- writer->WriteInt(ProtocolVersion);
- writer->WriteInt(Timeout.MilliSeconds());
- writer->WriteLong(SessionId);
- writer->WriteString(Password);
- writer->WriteBool(ReadOnly);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper
diff --git a/yt/yt/client/zookeeper/requests.h b/yt/yt/client/zookeeper/requests.h
deleted file mode 100644
index 14a1e20c17..0000000000
--- a/yt/yt/client/zookeeper/requests.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include "protocol.h"
-
-namespace NYT::NZookeeper {
-
-////////////////////////////////////////////////////////////////////////////////
-
-DEFINE_ENUM(ERequestType,
- ((None) (-1))
-);
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TReqStartSession
-{
- int ProtocolVersion = -1;
- i64 LastZxidSeen = -1;
- TDuration Timeout = TDuration::Zero();
- i64 SessionId = -1;
- TString Password;
- bool ReadOnly = false;
-
- void Deserialize(IZookeeperProtocolReader* reader);
-};
-
-struct TRspStartSession
-{
- int ProtocolVersion = -1;
- TDuration Timeout = TDuration::Zero();
- i64 SessionId = -1;
- TString Password;
- bool ReadOnly = false;
-
- void Serialize(IZookeeperProtocolWriter* writer) const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NZookeeper