diff options
author | don-dron <don-dron@yandex-team.com> | 2024-04-11 00:18:40 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2024-04-11 00:25:50 +0300 |
commit | 8ee95ccae309141e34f6f447be95731a1b24342d (patch) | |
tree | 9cf331e2e9ed845f7b7958b0cead88bd0b277de4 /yt | |
parent | f75da0b4f12053d1e9f6115b785f0f20111de7de (diff) | |
download | ydb-8ee95ccae309141e34f6f447be95731a1b24342d.tar.gz |
YT-21501: Remove ChunkMemoryAllocator from TPacketDecoder
82d49dd66617a5911d6d428a09d09aaf20f98ffb
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/bus/tcp/client.h | 2 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/packet.cpp | 38 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/packet.h | 4 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/server.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/bus/channel.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/common.h | 4 |
6 files changed, 14 insertions, 40 deletions
diff --git a/yt/yt/core/bus/tcp/client.h b/yt/yt/core/bus/tcp/client.h index 9cfbe84beb..1e916236cb 100644 --- a/yt/yt/core/bus/tcp/client.h +++ b/yt/yt/core/bus/tcp/client.h @@ -4,6 +4,8 @@ #include "packet.h" +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NBus { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/packet.cpp b/yt/yt/core/bus/tcp/packet.cpp index aed8a6f932..1ed6f6f6c5 100644 --- a/yt/yt/core/bus/tcp/packet.cpp +++ b/yt/yt/core/bus/tcp/packet.cpp @@ -6,8 +6,6 @@ #include <library/cpp/yt/string/guid.h> -#include <library/cpp/yt/memory/chunked_memory_allocator.h> - namespace NYT::NBus { //////////////////////////////////////////////////////////////////////////////// @@ -17,7 +15,6 @@ constexpr ui32 NullPacketPartSize = 0xffffffff; constexpr int TypicalPacketPartCount = 16; constexpr int TypicalVariableHeaderSize = TypicalPacketPartCount * (sizeof(ui32) + sizeof(ui64)); -constexpr i64 PacketDecoderChunkSize = 16_KB; //////////////////////////////////////////////////////////////////////////////// @@ -162,18 +159,10 @@ class TPacketDecoder public: TPacketDecoder( const NLogging::TLogger& logger, - bool verifyChecksum, - IMemoryUsageTrackerPtr memoryUsageTracker) + bool verifyChecksum) : TPacketTranscoderBase(logger) - , Allocator_( - PacketDecoderChunkSize, - TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio, - GetRefCountedTypeCookie<TPacketDecoderTag>()) , VerifyChecksum_(verifyChecksum) - , MemoryUsageTracker_(std::move(memoryUsageTracker)) { - YT_VERIFY(MemoryUsageTracker_); - Restart(); } @@ -207,7 +196,6 @@ public: Phase_ = EPacketPhase::FixedHeader; PacketSize_ = 0; Parts_.clear(); - MemoryGuard_ = TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, 0); PartIndex_ = -1; Message_.Reset(); @@ -247,17 +235,12 @@ public: private: friend class TPacketTranscoderBase<TPacketDecoder>; - TChunkedMemoryAllocator Allocator_; - std::vector<TSharedRef> Parts_; - TMemoryUsageTrackerGuard MemoryGuard_; size_t PacketSize_ = 0; const bool VerifyChecksum_; - const IMemoryUsageTrackerPtr MemoryUsageTracker_; - bool EndFixedHeaderPhase() { if (FixedHeader_.Signature != PacketSignature) { @@ -360,9 +343,7 @@ private: } else if (partSize == 0) { Parts_.push_back(TSharedRef::MakeEmpty()); } else { - auto part = Allocator_.AllocateAligned(partSize); - MemoryGuard_.IncrementSize(part.Size()); - + TSharedMutableRef part = TSharedMutableRef::Allocate(partSize); BeginPhase(EPacketPhase::MessagePart, part.Begin(), part.Size()); Parts_.push_back(std::move(part)); break; @@ -510,17 +491,11 @@ class TPacketTranscoderFactory : public IPacketTranscoderFactory { public: - TPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker) - : MemoryUsageTracker_(std::move(memoryUsageTracker)) - { - YT_VERIFY(MemoryUsageTracker_); - } - std::unique_ptr<IPacketDecoder> CreateDecoder( const NLogging::TLogger& logger, bool verifyChecksum) const override { - return std::make_unique<TPacketDecoder>(logger, verifyChecksum, MemoryUsageTracker_); + return std::make_unique<TPacketDecoder>(logger, verifyChecksum); } std::unique_ptr<IPacketEncoder> CreateEncoder( @@ -528,16 +503,13 @@ public: { return std::make_unique<TPacketEncoder>(logger); } - -private: - const IMemoryUsageTrackerPtr MemoryUsageTracker_; }; //////////////////////////////////////////////////////////////////////////////// -IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker) +IPacketTranscoderFactory* GetYTPacketTranscoderFactory() { - return LeakySingleton<TPacketTranscoderFactory>(std::move(memoryUsageTracker)); + return LeakySingleton<TPacketTranscoderFactory>(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/packet.h b/yt/yt/core/bus/tcp/packet.h index 2e2a729ee1..05e4693128 100644 --- a/yt/yt/core/bus/tcp/packet.h +++ b/yt/yt/core/bus/tcp/packet.h @@ -2,8 +2,6 @@ #include "private.h" -#include <yt/yt/core/misc/memory_usage_tracker.h> - namespace NYT::NBus { //////////////////////////////////////////////////////////////////////////////// @@ -78,7 +76,7 @@ struct IPacketTranscoderFactory //////////////////////////////////////////////////////////////////////////////// -IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker()); +IPacketTranscoderFactory* GetYTPacketTranscoderFactory(); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/server.h b/yt/yt/core/bus/tcp/server.h index 57b3a0cde9..b7de29a8a3 100644 --- a/yt/yt/core/bus/tcp/server.h +++ b/yt/yt/core/bus/tcp/server.h @@ -4,6 +4,8 @@ #include "packet.h" +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NBus { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index a43b5c8f3e..b085986df7 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -1261,7 +1261,7 @@ public: config->Load(Config_, /*postprocess*/ true, /*setDefaults*/ false); auto client = CreateBusClient( std::move(config), - GetYTPacketTranscoderFactory(MemoryUsageTracker_), + GetYTPacketTranscoderFactory(), MemoryUsageTracker_); return CreateBusChannel(std::move(client)); } @@ -1301,7 +1301,7 @@ public: config->Load(Config_, /*postprocess*/ true, /*setDefaults*/ false); auto client = CreateBusClient( std::move(config), - GetYTPacketTranscoderFactory(MemoryUsageTracker_), + GetYTPacketTranscoderFactory(), MemoryUsageTracker_); return CreateBusChannel(std::move(client)); } diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h index ccb272df66..71fd251a80 100644 --- a/yt/yt/core/rpc/unittests/lib/common.h +++ b/yt/yt/core/rpc/unittests/lib/common.h @@ -265,7 +265,7 @@ public: auto busConfig = NYT::NBus::TBusServerConfig::CreateTcp(port); return CreateBusServer( busConfig, - NYT::NBus::GetYTPacketTranscoderFactory(memoryUsageTracker), + NYT::NBus::GetYTPacketTranscoderFactory(), memoryUsageTracker); } }; @@ -486,7 +486,7 @@ public: auto busConfig = NYT::NBus::TBusServerConfig::CreateUds(SocketPath_); return CreateBusServer( busConfig, - NYT::NBus::GetYTPacketTranscoderFactory(memoryUsageTracker), + NYT::NBus::GetYTPacketTranscoderFactory(), memoryUsageTracker); } |