diff options
author | babenko <babenko@yandex-team.com> | 2025-01-02 15:12:46 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-01-02 15:29:58 +0300 |
commit | 272bf996ca8be01fd348e8b858097954a95e45a6 (patch) | |
tree | 1429e852ac86016a3ba5c966d084a349c7980e30 | |
parent | 12bc174eaf16a035b7d1e950e84ada736e962cbe (diff) | |
download | ydb-272bf996ca8be01fd348e8b858097954a95e45a6.tar.gz |
RIP zookeeper support, hope to see you again soon
commit_hash:ab647f4f0ce8f7ce4bcdaa11eaf3ed0dffec8d09
-rw-r--r-- | yt/yt/client/object_client/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/unittests/ya.make | 2 | ||||
-rw-r--r-- | yt/yt/client/unittests/zookeeper_bus_ut.cpp | 165 | ||||
-rw-r--r-- | yt/yt/client/unittests/zookeeper_protocol_ut.cpp | 106 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 4 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/packet.cpp | 294 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/packet.h | 15 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/protocol.cpp | 204 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/protocol.h | 59 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/public.h | 19 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/requests.cpp | 28 | ||||
-rw-r--r-- | yt/yt/client/zookeeper/requests.h | 42 |
12 files changed, 1 insertions, 939 deletions
diff --git a/yt/yt/client/object_client/public.h b/yt/yt/client/object_client/public.h index 5937956d08..7cf79d3b1f 100644 --- a/yt/yt/client/object_client/public.h +++ b/yt/yt/client/object_client/public.h @@ -344,7 +344,7 @@ DEFINE_ENUM(EObjectType, ((ClusterProxyNode) (1500)) // Zookeeper stuff - ((ZookeeperShard) (1400)) + // COMPAT(babenko): drop completely ((ZookeeperShardMap) (1401)) // Flow stuff diff --git a/yt/yt/client/unittests/ya.make b/yt/yt/client/unittests/ya.make index a71a98db2c..bbde30040b 100644 --- a/yt/yt/client/unittests/ya.make +++ b/yt/yt/client/unittests/ya.make @@ -34,8 +34,6 @@ SRCS( validate_logical_type_ut.cpp wire_protocol_ut.cpp ypath_ut.cpp - zookeeper_bus_ut.cpp - zookeeper_protocol_ut.cpp ) INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc) diff --git a/yt/yt/client/unittests/zookeeper_bus_ut.cpp b/yt/yt/client/unittests/zookeeper_bus_ut.cpp deleted file mode 100644 index a3ecdc414d..0000000000 --- a/yt/yt/client/unittests/zookeeper_bus_ut.cpp +++ /dev/null @@ -1,165 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/client/zookeeper/packet.h> - -#include <yt/yt/core/bus/bus.h> -#include <yt/yt/core/bus/client.h> -#include <yt/yt/core/bus/server.h> - -#include <yt/yt/core/bus/tcp/config.h> -#include <yt/yt/core/bus/tcp/client.h> -#include <yt/yt/core/bus/tcp/server.h> - -#include <yt/yt/core/net/socket.h> - -#include <library/cpp/testing/common/network.h> - -#include <library/cpp/yt/threading/event_count.h> - -namespace NYT::NZookeeper { -namespace { - -using namespace NBus; - -//////////////////////////////////////////////////////////////////////////////// - -TSharedRefArray CreateMessage(int size) -{ - auto data = TSharedMutableRef::Allocate(size); - return TSharedRefArray(TSharedRef(data)); -} - -TSharedRefArray Serialize(TString message) -{ - return TSharedRefArray(TSharedRef::FromString(message)); -} - -TString Deserialize(TSharedRefArray message) -{ - YT_ASSERT(message.Size() == 1); - const auto& part = message[0]; - return TString(part.Begin(), part.Size()); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TReplyingBusHandler - : public IMessageHandler -{ -public: - TReplyingBusHandler(TString message) - : Message_(std::move(message)) - { } - - void HandleMessage( - TSharedRefArray message, - IBusPtr replyBus) noexcept override - { - EXPECT_EQ(1, std::ssize(message)); - auto replyMessage = Serialize(Message_); - YT_UNUSED_FUTURE(replyBus->Send(replyMessage)); - } - -private: - const TString Message_; -}; - -class TCheckingBusHandler - : public IMessageHandler -{ -public: - explicit TCheckingBusHandler(int numRepliesWaiting, TString message) - : Message_(std::move(message)) - , NumRepliesWaiting(numRepliesWaiting) - { } - - void WaitUntilDone() - { - Event_.Wait(); - } - -private: - const TString Message_; - - std::atomic<int> NumRepliesWaiting; - NThreading::TEvent Event_; - - void HandleMessage( - TSharedRefArray message, - IBusPtr /*replyBus*/) noexcept override - { - auto value = Deserialize(message); - EXPECT_EQ(Message_, value); - - if (--NumRepliesWaiting == 0) { - Event_.NotifyAll(); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TZookeeperBusTest - : public testing::Test -{ -public: - NTesting::TPortHolder Port; - TString Address; - - TZookeeperBusTest() - { - Port = NTesting::GetFreePort(); - Address = Format("localhost:%v", Port); - } - - IBusServerPtr StartBusServer(IMessageHandlerPtr handler) - { - auto config = TBusServerConfig::CreateTcp(Port); - auto server = CreateBusServer( - config, - GetZookeeperPacketTranscoderFactory()); - server->Start(handler); - return server; - } - - void TestReplies(int numRequests, const TString& message) - { - auto server = StartBusServer(New<TReplyingBusHandler>(message)); - auto client = CreateBusClient( - TBusClientConfig::CreateTcp(Address), - GetZookeeperPacketTranscoderFactory()); - auto handler = New<TCheckingBusHandler>(numRequests, message); - auto bus = client->CreateBus(handler); - - std::vector<TFuture<void>> results; - for (int i = 0; i < numRequests; ++i) { - if (auto result = bus->Send(CreateMessage(10))) { - results.push_back(result); - } - } - - for (const auto& result : results) { - auto error = result.Get(); - EXPECT_TRUE(error.IsOK()); - } - - handler->WaitUntilDone(); - - server->Stop() - .Get() - .ThrowOnError(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -TEST_F(TZookeeperBusTest, Simple) -{ - TestReplies(1, "42"); - TestReplies(100, "abacaba"); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/unittests/zookeeper_protocol_ut.cpp b/yt/yt/client/unittests/zookeeper_protocol_ut.cpp deleted file mode 100644 index 03539fc3c0..0000000000 --- a/yt/yt/client/unittests/zookeeper_protocol_ut.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/client/zookeeper/protocol.h> - -namespace NYT::NZookeeper { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -TEST(TZookeeperProtocolTest, Simple) -{ - const TString LongString{100'000, 'a'}; - - auto writer = CreateZookeeperProtocolWriter(); - writer->WriteByte('$'); - writer->WriteInt(42); - writer->WriteInt(std::numeric_limits<int>::min()); - writer->WriteInt(std::numeric_limits<int>::max()); - writer->WriteLong(123); - writer->WriteLong(std::numeric_limits<i64>::min()); - writer->WriteLong(std::numeric_limits<i64>::max()); - writer->WriteBool(false); - writer->WriteBool(true); - writer->WriteString("abacaba"); - writer->WriteString(LongString); - writer->WriteString(""); - - auto data = writer->Finish(); - auto reader = CreateZookeeperProtocolReader(std::move(data)); - EXPECT_EQ('$', reader->ReadByte()); - EXPECT_EQ(42, reader->ReadInt()); - EXPECT_EQ(std::numeric_limits<int>::min(), reader->ReadInt()); - EXPECT_EQ(std::numeric_limits<int>::max(), reader->ReadInt()); - EXPECT_EQ(123, reader->ReadLong()); - EXPECT_EQ(std::numeric_limits<i64>::min(), reader->ReadLong()); - EXPECT_EQ(std::numeric_limits<i64>::max(), reader->ReadLong()); - EXPECT_EQ(false, reader->ReadBool()); - EXPECT_EQ(true, reader->ReadBool()); - EXPECT_EQ("abacaba", reader->ReadString()); - EXPECT_EQ(LongString, reader->ReadString()); - - EXPECT_FALSE(reader->IsFinished()); - EXPECT_THROW_WITH_SUBSTRING(reader->ValidateFinished(), "Expected end of stream"); - - TString longString; - longString.resize(1'000'000); - reader->ReadString(&longString); - EXPECT_EQ(LongString, LongString); - - EXPECT_TRUE(reader->IsFinished()); - reader->ValidateFinished(); -} - -TEST(TZookeeperProtocolTest, WriterReallocation) -{ - constexpr int Count = 954023; - - auto writer = CreateZookeeperProtocolWriter(); - for (int i = 0; i < Count; ++i) { - writer->WriteLong(i); - } - - auto data = writer->Finish(); - EXPECT_EQ(8u * Count, data.Size()); - - auto reader = CreateZookeeperProtocolReader(std::move(data)); - for (int i = 0; i < Count; ++i) { - EXPECT_EQ(i, reader->ReadLong()); - } - reader->ValidateFinished(); -} - -TEST(TZookeeperProtocolTest, CorruptedInput) -{ - { - auto reader = CreateZookeeperProtocolReader(TSharedRef::MakeEmpty()); - EXPECT_THROW_WITH_SUBSTRING(reader->ReadInt(), "Premature end of stream"); - EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream"); - } - { - auto writer = CreateZookeeperProtocolWriter(); - writer->WriteString(TString{100'000, 'a'}); - auto data = writer->Finish(); - data = data.Slice(0, 12345); - auto reader = CreateZookeeperProtocolReader(std::move(data)); - EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream"); - } - { - auto writer = CreateZookeeperProtocolWriter(); - writer->WriteLong(1234); - writer->WriteLong(std::numeric_limits<i64>::max()); - writer->WriteLong('a'); - writer->WriteLong('b'); - - auto data = writer->Finish(); - auto reader = CreateZookeeperProtocolReader(std::move(data)); - EXPECT_EQ(1234, reader->ReadLong()); - // NB: Offset + Size may overflow here if implemented carelessly. - EXPECT_THROW_WITH_SUBSTRING(reader->ReadString(), "Premature end of stream"); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index cc55baf1a4..f183fca520 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -193,10 +193,6 @@ SRCS( complex_types/uuid_text.cpp complex_types/yson_format_conversion.cpp - zookeeper/packet.cpp - zookeeper/protocol.cpp - zookeeper/requests.cpp - kafka/packet.cpp kafka/protocol.cpp kafka/requests.cpp diff --git a/yt/yt/client/zookeeper/packet.cpp b/yt/yt/client/zookeeper/packet.cpp deleted file mode 100644 index d1dad9427c..0000000000 --- a/yt/yt/client/zookeeper/packet.cpp +++ /dev/null @@ -1,294 +0,0 @@ -#include "packet.h" - -#include <library/cpp/yt/memory/chunked_memory_allocator.h> - -namespace NYT::NZookeeper { - -using namespace NBus; - -//////////////////////////////////////////////////////////////////////////////// - -DEFINE_ENUM(EPacketPhase, - (Header) - (Message) - (Finished) -); - -struct TPacketDecoderTag -{ }; - -//////////////////////////////////////////////////////////////////////////////// - -struct TPacketTranscoderBase -{ - union { - int MessageSize; - char Data[sizeof(int)]; - } Header_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TPacketDecoder - : public IPacketDecoder - , public TPacketTranscoderBase -{ -public: - TPacketDecoder() - : Allocator_( - PacketDecoderChunkSize, - TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio, - GetRefCountedTypeCookie<TPacketDecoderTag>()) - { - Restart(); - } - - void Restart() override - { - PacketSize_ = 0; - Message_.Reset(); - - BeginPhase( - EPacketPhase::Header, - Header_.Data, - sizeof(Header_.Data)); - } - - bool IsInProgress() const override - { - return !IsFinished(); - } - - bool IsFinished() const override - { - return Phase_ == EPacketPhase::Finished; - } - - TMutableRef GetFragment() override - { - return TMutableRef(FragmentPtr_, FragmentRemaining_); - } - - bool Advance(size_t size) override - { - YT_ASSERT(FragmentRemaining_ != 0); - YT_ASSERT(size <= FragmentRemaining_); - - PacketSize_ += size; - FragmentRemaining_ -= size; - FragmentPtr_ += size; - if (FragmentRemaining_ == 0) { - return EndPhase(); - } else { - return true; - } - } - - EPacketType GetPacketType() const override - { - return EPacketType::Message; - } - - EPacketFlags GetPacketFlags() const override - { - return EPacketFlags::None; - } - - TPacketId GetPacketId() const override - { - return {}; - } - - size_t GetPacketSize() const override - { - return PacketSize_; - } - - TSharedRefArray GrabMessage() const override - { - return TSharedRefArray(Message_); - } - -private: - EPacketPhase Phase_ = EPacketPhase::Finished; - char* FragmentPtr_ = nullptr; - size_t FragmentRemaining_ = 0; - - TChunkedMemoryAllocator Allocator_; - static constexpr i64 PacketDecoderChunkSize = 16_KB; - - TSharedRef Message_; - - size_t PacketSize_ = 0; - - void BeginPhase( - EPacketPhase phase, - char* fragmentPtr, - size_t fragmentSize) - { - Phase_ = phase; - FragmentPtr_ = fragmentPtr; - FragmentRemaining_ = fragmentSize; - } - - bool EndPhase() - { - switch (Phase_) { - case EPacketPhase::Header: { - std::reverse(std::begin(Header_.Data), std::end(Header_.Data)); - auto messageSize = Header_.MessageSize; - auto message = Allocator_.AllocateAligned(messageSize); - BeginPhase( - EPacketPhase::Message, - /*fragmentPtr*/ message.begin(), - /*fragmentSize*/ message.size()); - Message_ = std::move(message); - - return true; - } - case EPacketPhase::Message: - BeginPhase( - EPacketPhase::Finished, - /*fragmentPtr*/ nullptr, - /*fragmentSize*/ 0); - - return true; - default: - YT_ABORT(); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TPacketEncoder - : public IPacketEncoder - , public TPacketTranscoderBase -{ -public: - size_t GetPacketSize( - EPacketType type, - const TSharedRefArray& /*message*/, - size_t messageSize) override - { - YT_ASSERT(type == EPacketType::Message); - YT_ASSERT(messageSize > 0); - - return sizeof(Header_) + messageSize; - } - - bool Start( - EPacketType type, - EPacketFlags flags, - bool /*generateChecksums*/, - int /*checksummedPartCount*/, - TPacketId /*packetId*/, - TSharedRefArray messageParts) override - { - YT_ASSERT(type == EPacketType::Message); - YT_ASSERT(flags == EPacketFlags::None); - YT_ASSERT(!messageParts.Empty()); - - i64 messageSize = 0; - for (const auto& messagePart : messageParts) { - messageSize += messagePart.size(); - } - Header_.MessageSize = messageSize; - std::reverse(std::begin(Header_.Data), std::end(Header_.Data)); - - MessageParts_ = std::move(messageParts); - - Phase_ = EPacketPhase::Header; - CurrentMessagePart_ = 0; - - return true; - } - - TMutableRef GetFragment() override - { - switch (Phase_) { - case EPacketPhase::Header: - return TMutableRef(Header_.Data, sizeof(Header_.Data)); - case EPacketPhase::Message: { - const auto& messagePart = MessageParts_[CurrentMessagePart_]; - return TMutableRef( - const_cast<char*>(messagePart.begin()), - messagePart.size()); - } - default: - YT_ABORT(); - } - } - - bool IsFragmentOwned() const override - { - switch (Phase_) { - case EPacketPhase::Header: - return false; - case EPacketPhase::Message: - return true; - default: - YT_ABORT(); - } - } - - void NextFragment() override - { - switch (Phase_) { - case EPacketPhase::Header: - Phase_ = EPacketPhase::Message; - CurrentMessagePart_ = 0; - break; - case EPacketPhase::Message: - if (CurrentMessagePart_ + 1 < MessageParts_.size()) { - ++CurrentMessagePart_; - } else { - Phase_ = EPacketPhase::Finished; - } - break; - default: - YT_ABORT(); - } - } - - bool IsFinished() const override - { - return Phase_ == EPacketPhase::Finished; - } - -private: - EPacketPhase Phase_ = EPacketPhase::Finished; - - TSharedRefArray MessageParts_; - size_t CurrentMessagePart_ = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct TPacketTranscoderFactory - : public IPacketTranscoderFactory -{ - std::unique_ptr<IPacketDecoder> CreateDecoder( - const NLogging::TLogger& /*logger*/, - bool /*verifyChecksum*/) const override - { - return std::make_unique<TPacketDecoder>(); - } - - std::unique_ptr<IPacketEncoder> CreateEncoder( - const NLogging::TLogger& /*logger*/) const override - { - return std::make_unique<TPacketEncoder>(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -IPacketTranscoderFactory* GetZookeeperPacketTranscoderFactory() -{ - return LeakySingleton<TPacketTranscoderFactory>(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/zookeeper/packet.h b/yt/yt/client/zookeeper/packet.h deleted file mode 100644 index dd7eb806e1..0000000000 --- a/yt/yt/client/zookeeper/packet.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/bus/tcp/packet.h> - -namespace NYT::NZookeeper { - -//////////////////////////////////////////////////////////////////////////////// - -NBus::IPacketTranscoderFactory* GetZookeeperPacketTranscoderFactory(); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/zookeeper/protocol.cpp b/yt/yt/client/zookeeper/protocol.cpp deleted file mode 100644 index 857f7b4ddd..0000000000 --- a/yt/yt/client/zookeeper/protocol.cpp +++ /dev/null @@ -1,204 +0,0 @@ -#include "protocol.h" - -#include <yt/yt/core/misc/error.h> - -namespace NYT::NZookeeper { - -//////////////////////////////////////////////////////////////////////////////// - -class TZookeeperProtocolReader - : public IZookeeperProtocolReader -{ -public: - explicit TZookeeperProtocolReader(TSharedRef data) - : Data_(std::move(data)) - { } - - char ReadByte() override - { - return DoReadInt<char>(); - } - - int ReadInt() override - { - return DoReadInt<int>(); - } - - i64 ReadLong() override - { - return DoReadInt<i64>(); - } - - bool ReadBool() override - { - auto value = ReadByte(); - return value > 0; - } - - TString ReadString() override - { - TString result; - ReadString(&result); - - return result; - } - - void ReadString(TString* result) override - { - auto length = ReadInt(); - ValidateSizeAvailable(length); - - result->resize(length); - auto begin = Data_.begin() + Offset_; - std::copy(begin, begin + length, result->begin()); - Offset_ += length; - } - - TSharedRef GetSuffix() const override - { - return Data_.Slice(Offset_, Data_.Size()); - } - - bool IsFinished() const override - { - return Offset_ == std::ssize(Data_); - } - - void ValidateFinished() const override - { - if (!IsFinished()) { - THROW_ERROR_EXCEPTION("Expected end of stream") - << TErrorAttribute("offset", Offset_) - << TErrorAttribute("message_size", Data_.size()); - } - } - -private: - const TSharedRef Data_; - i64 Offset_ = 0; - - template <typename T> - T DoReadInt() - { - ValidateSizeAvailable(sizeof(T)); - - union { - T value; - char bytes[sizeof(T)]; - } result; - - memcpy(result.bytes, Data_.begin() + Offset_, sizeof(T)); - std::reverse(result.bytes, result.bytes + sizeof(T)); - Offset_ += sizeof(T); - - return result.value; - } - - void ValidateSizeAvailable(i64 size) - { - if (std::ssize(Data_) - Offset_ < size) { - THROW_ERROR_EXCEPTION("Premature end of stream while reading %v bytes", size); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<IZookeeperProtocolReader> CreateZookeeperProtocolReader(TSharedRef data) -{ - return std::make_unique<TZookeeperProtocolReader>(std::move(data)); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TZookeeperProtocolWriter - : public IZookeeperProtocolWriter -{ -public: - TZookeeperProtocolWriter() - : Buffer_(AllocateBuffer(InitialBufferSize)) - { } - - void WriteByte(char value) override - { - DoWriteInt(value); - } - - void WriteInt(int value) override - { - DoWriteInt(value); - } - - void WriteLong(i64 value) override - { - DoWriteInt(value); - } - - void WriteBool(bool value) override - { - WriteByte(value ? 1 : 0); - } - - void WriteString(const TString& value) override - { - WriteInt(value.size()); - - EnsureFreeSpace(value.size()); - - std::copy(value.begin(), value.end(), Buffer_.begin() + Size_); - Size_ += value.size(); - } - - TSharedRef Finish() override - { - return Buffer_.Slice(0, Size_); - } - -private: - struct TZookeeperProtocolWriterTag - { }; - - static constexpr i64 InitialBufferSize = 16_KB; - static constexpr i64 BufferSizeMultiplier = 2; - - TSharedMutableRef Buffer_; - i64 Size_ = 0; - - template <typename T> - void DoWriteInt(T value) - { - EnsureFreeSpace(sizeof(T)); - - memcpy(Buffer_.begin() + Size_, &value, sizeof(T)); - std::reverse(Buffer_.begin() + Size_, Buffer_.begin() + Size_ + sizeof(T)); - Size_+= sizeof(T); - } - - void EnsureFreeSpace(i64 size) - { - if (Size_ + size <= std::ssize(Buffer_)) { - return; - } - - auto newSize = std::max<i64>(Size_ + size, Buffer_.size() * BufferSizeMultiplier); - auto newBuffer = AllocateBuffer(newSize); - std::copy(Buffer_.begin(), Buffer_.begin() + Size_, newBuffer.begin()); - Buffer_ = std::move(newBuffer); - } - - static TSharedMutableRef AllocateBuffer(i64 capacity) - { - return TSharedMutableRef::Allocate<TZookeeperProtocolWriterTag>(capacity); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<IZookeeperProtocolWriter> CreateZookeeperProtocolWriter() -{ - return std::make_unique<TZookeeperProtocolWriter>(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/zookeeper/protocol.h b/yt/yt/client/zookeeper/protocol.h deleted file mode 100644 index 29fea1da57..0000000000 --- a/yt/yt/client/zookeeper/protocol.h +++ /dev/null @@ -1,59 +0,0 @@ -#pragma once - -#include "public.h" - -#include <library/cpp/yt/memory/ref.h> - -namespace NYT::NZookeeper { - -//////////////////////////////////////////////////////////////////////////////// - -struct IZookeeperProtocolReader -{ - virtual ~IZookeeperProtocolReader() = default; - - virtual char ReadByte() = 0; - virtual int ReadInt() = 0; - virtual i64 ReadLong() = 0; - - virtual bool ReadBool() = 0; - - virtual TString ReadString() = 0; - virtual void ReadString(TString* result) = 0; - - virtual TSharedRef GetSuffix() const = 0; - - //! Returns true if input is fully consumed and false otherwise. - virtual bool IsFinished() const = 0; - //! Throws an error if input is not fully consumed. Does nothing otherwise. - virtual void ValidateFinished() const = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<IZookeeperProtocolReader> CreateZookeeperProtocolReader(TSharedRef data); - -//////////////////////////////////////////////////////////////////////////////// - -struct IZookeeperProtocolWriter -{ - virtual ~IZookeeperProtocolWriter() = default; - - virtual void WriteByte(char value) = 0; - virtual void WriteInt(int value) = 0; - virtual void WriteLong(i64 value) = 0; - - virtual void WriteBool(bool value) = 0; - - virtual void WriteString(const TString& value) = 0; - - virtual TSharedRef Finish() = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<IZookeeperProtocolWriter> CreateZookeeperProtocolWriter(); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/zookeeper/public.h b/yt/yt/client/zookeeper/public.h deleted file mode 100644 index 7f3c7c6823..0000000000 --- a/yt/yt/client/zookeeper/public.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#include <yt/yt/core/misc/public.h> - -namespace NYT::NZookeeperClient { - -//////////////////////////////////////////////////////////////////////////////// - -using TZookeeperPath = TString; - -//////////////////////////////////////////////////////////////////////////////// - -using TSessionId = ui64; - -constexpr TSessionId NullSessionId = 0; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeperClient diff --git a/yt/yt/client/zookeeper/requests.cpp b/yt/yt/client/zookeeper/requests.cpp deleted file mode 100644 index 26eafd1a03..0000000000 --- a/yt/yt/client/zookeeper/requests.cpp +++ /dev/null @@ -1,28 +0,0 @@ -#include "requests.h" - -namespace NYT::NZookeeper { - -//////////////////////////////////////////////////////////////////////////////// - -void TReqStartSession::Deserialize(IZookeeperProtocolReader* reader) -{ - ProtocolVersion = reader->ReadInt(); - LastZxidSeen = reader->ReadLong(); - Timeout = TDuration::MilliSeconds(reader->ReadInt()); - SessionId = reader->ReadLong(); - Password = reader->ReadString(); - ReadOnly = reader->ReadBool(); -} - -void TRspStartSession::Serialize(IZookeeperProtocolWriter* writer) const -{ - writer->WriteInt(ProtocolVersion); - writer->WriteInt(Timeout.MilliSeconds()); - writer->WriteLong(SessionId); - writer->WriteString(Password); - writer->WriteBool(ReadOnly); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper diff --git a/yt/yt/client/zookeeper/requests.h b/yt/yt/client/zookeeper/requests.h deleted file mode 100644 index 14a1e20c17..0000000000 --- a/yt/yt/client/zookeeper/requests.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "public.h" - -#include "protocol.h" - -namespace NYT::NZookeeper { - -//////////////////////////////////////////////////////////////////////////////// - -DEFINE_ENUM(ERequestType, - ((None) (-1)) -); - -//////////////////////////////////////////////////////////////////////////////// - -struct TReqStartSession -{ - int ProtocolVersion = -1; - i64 LastZxidSeen = -1; - TDuration Timeout = TDuration::Zero(); - i64 SessionId = -1; - TString Password; - bool ReadOnly = false; - - void Deserialize(IZookeeperProtocolReader* reader); -}; - -struct TRspStartSession -{ - int ProtocolVersion = -1; - TDuration Timeout = TDuration::Zero(); - i64 SessionId = -1; - TString Password; - bool ReadOnly = false; - - void Serialize(IZookeeperProtocolWriter* writer) const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NZookeeper |