aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2024-04-11 00:18:40 +0300
committerdon-dron <don-dron@yandex-team.com>2024-04-11 00:25:50 +0300
commit8ee95ccae309141e34f6f447be95731a1b24342d (patch)
tree9cf331e2e9ed845f7b7958b0cead88bd0b277de4 /yt
parentf75da0b4f12053d1e9f6115b785f0f20111de7de (diff)
downloadydb-8ee95ccae309141e34f6f447be95731a1b24342d.tar.gz
YT-21501: Remove ChunkMemoryAllocator from TPacketDecoder
82d49dd66617a5911d6d428a09d09aaf20f98ffb
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/bus/tcp/client.h2
-rw-r--r--yt/yt/core/bus/tcp/packet.cpp38
-rw-r--r--yt/yt/core/bus/tcp/packet.h4
-rw-r--r--yt/yt/core/bus/tcp/server.h2
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp4
-rw-r--r--yt/yt/core/rpc/unittests/lib/common.h4
6 files changed, 14 insertions, 40 deletions
diff --git a/yt/yt/core/bus/tcp/client.h b/yt/yt/core/bus/tcp/client.h
index 9cfbe84beb..1e916236cb 100644
--- a/yt/yt/core/bus/tcp/client.h
+++ b/yt/yt/core/bus/tcp/client.h
@@ -4,6 +4,8 @@
#include "packet.h"
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
namespace NYT::NBus {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/bus/tcp/packet.cpp b/yt/yt/core/bus/tcp/packet.cpp
index aed8a6f932..1ed6f6f6c5 100644
--- a/yt/yt/core/bus/tcp/packet.cpp
+++ b/yt/yt/core/bus/tcp/packet.cpp
@@ -6,8 +6,6 @@
#include <library/cpp/yt/string/guid.h>
-#include <library/cpp/yt/memory/chunked_memory_allocator.h>
-
namespace NYT::NBus {
////////////////////////////////////////////////////////////////////////////////
@@ -17,7 +15,6 @@ constexpr ui32 NullPacketPartSize = 0xffffffff;
constexpr int TypicalPacketPartCount = 16;
constexpr int TypicalVariableHeaderSize = TypicalPacketPartCount * (sizeof(ui32) + sizeof(ui64));
-constexpr i64 PacketDecoderChunkSize = 16_KB;
////////////////////////////////////////////////////////////////////////////////
@@ -162,18 +159,10 @@ class TPacketDecoder
public:
TPacketDecoder(
const NLogging::TLogger& logger,
- bool verifyChecksum,
- IMemoryUsageTrackerPtr memoryUsageTracker)
+ bool verifyChecksum)
: TPacketTranscoderBase(logger)
- , Allocator_(
- PacketDecoderChunkSize,
- TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio,
- GetRefCountedTypeCookie<TPacketDecoderTag>())
, VerifyChecksum_(verifyChecksum)
- , MemoryUsageTracker_(std::move(memoryUsageTracker))
{
- YT_VERIFY(MemoryUsageTracker_);
-
Restart();
}
@@ -207,7 +196,6 @@ public:
Phase_ = EPacketPhase::FixedHeader;
PacketSize_ = 0;
Parts_.clear();
- MemoryGuard_ = TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, 0);
PartIndex_ = -1;
Message_.Reset();
@@ -247,17 +235,12 @@ public:
private:
friend class TPacketTranscoderBase<TPacketDecoder>;
- TChunkedMemoryAllocator Allocator_;
-
std::vector<TSharedRef> Parts_;
- TMemoryUsageTrackerGuard MemoryGuard_;
size_t PacketSize_ = 0;
const bool VerifyChecksum_;
- const IMemoryUsageTrackerPtr MemoryUsageTracker_;
-
bool EndFixedHeaderPhase()
{
if (FixedHeader_.Signature != PacketSignature) {
@@ -360,9 +343,7 @@ private:
} else if (partSize == 0) {
Parts_.push_back(TSharedRef::MakeEmpty());
} else {
- auto part = Allocator_.AllocateAligned(partSize);
- MemoryGuard_.IncrementSize(part.Size());
-
+ TSharedMutableRef part = TSharedMutableRef::Allocate(partSize);
BeginPhase(EPacketPhase::MessagePart, part.Begin(), part.Size());
Parts_.push_back(std::move(part));
break;
@@ -510,17 +491,11 @@ class TPacketTranscoderFactory
: public IPacketTranscoderFactory
{
public:
- TPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker)
- : MemoryUsageTracker_(std::move(memoryUsageTracker))
- {
- YT_VERIFY(MemoryUsageTracker_);
- }
-
std::unique_ptr<IPacketDecoder> CreateDecoder(
const NLogging::TLogger& logger,
bool verifyChecksum) const override
{
- return std::make_unique<TPacketDecoder>(logger, verifyChecksum, MemoryUsageTracker_);
+ return std::make_unique<TPacketDecoder>(logger, verifyChecksum);
}
std::unique_ptr<IPacketEncoder> CreateEncoder(
@@ -528,16 +503,13 @@ public:
{
return std::make_unique<TPacketEncoder>(logger);
}
-
-private:
- const IMemoryUsageTrackerPtr MemoryUsageTracker_;
};
////////////////////////////////////////////////////////////////////////////////
-IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker)
+IPacketTranscoderFactory* GetYTPacketTranscoderFactory()
{
- return LeakySingleton<TPacketTranscoderFactory>(std::move(memoryUsageTracker));
+ return LeakySingleton<TPacketTranscoderFactory>();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/bus/tcp/packet.h b/yt/yt/core/bus/tcp/packet.h
index 2e2a729ee1..05e4693128 100644
--- a/yt/yt/core/bus/tcp/packet.h
+++ b/yt/yt/core/bus/tcp/packet.h
@@ -2,8 +2,6 @@
#include "private.h"
-#include <yt/yt/core/misc/memory_usage_tracker.h>
-
namespace NYT::NBus {
////////////////////////////////////////////////////////////////////////////////
@@ -78,7 +76,7 @@ struct IPacketTranscoderFactory
////////////////////////////////////////////////////////////////////////////////
-IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker());
+IPacketTranscoderFactory* GetYTPacketTranscoderFactory();
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/bus/tcp/server.h b/yt/yt/core/bus/tcp/server.h
index 57b3a0cde9..b7de29a8a3 100644
--- a/yt/yt/core/bus/tcp/server.h
+++ b/yt/yt/core/bus/tcp/server.h
@@ -4,6 +4,8 @@
#include "packet.h"
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
namespace NYT::NBus {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index a43b5c8f3e..b085986df7 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -1261,7 +1261,7 @@ public:
config->Load(Config_, /*postprocess*/ true, /*setDefaults*/ false);
auto client = CreateBusClient(
std::move(config),
- GetYTPacketTranscoderFactory(MemoryUsageTracker_),
+ GetYTPacketTranscoderFactory(),
MemoryUsageTracker_);
return CreateBusChannel(std::move(client));
}
@@ -1301,7 +1301,7 @@ public:
config->Load(Config_, /*postprocess*/ true, /*setDefaults*/ false);
auto client = CreateBusClient(
std::move(config),
- GetYTPacketTranscoderFactory(MemoryUsageTracker_),
+ GetYTPacketTranscoderFactory(),
MemoryUsageTracker_);
return CreateBusChannel(std::move(client));
}
diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h
index ccb272df66..71fd251a80 100644
--- a/yt/yt/core/rpc/unittests/lib/common.h
+++ b/yt/yt/core/rpc/unittests/lib/common.h
@@ -265,7 +265,7 @@ public:
auto busConfig = NYT::NBus::TBusServerConfig::CreateTcp(port);
return CreateBusServer(
busConfig,
- NYT::NBus::GetYTPacketTranscoderFactory(memoryUsageTracker),
+ NYT::NBus::GetYTPacketTranscoderFactory(),
memoryUsageTracker);
}
};
@@ -486,7 +486,7 @@ public:
auto busConfig = NYT::NBus::TBusServerConfig::CreateUds(SocketPath_);
return CreateBusServer(
busConfig,
- NYT::NBus::GetYTPacketTranscoderFactory(memoryUsageTracker),
+ NYT::NBus::GetYTPacketTranscoderFactory(),
memoryUsageTracker);
}