aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorni-stoiko <ni-stoiko@yandex-team.com>2023-08-24 21:33:16 +0300
committerni-stoiko <ni-stoiko@yandex-team.com>2023-08-24 22:18:11 +0300
commitf5cce2b33840e0f8eaf4b36e78bcf0bb6aaffb34 (patch)
treedc0dd0491c5ed4356437966a480b9f4cd7d72e10
parentb1210bd96e66a5e7c66cc3c602e3062157170222 (diff)
downloadydb-f5cce2b33840e0f8eaf4b36e78bcf0bb6aaffb34.tar.gz
YT-19720: Using TraceContext AllocationTags in RPC
Attempt to use AllocationTag in RPC
-rw-r--r--yt/yt/core/rpc/client-inl.h7
-rw-r--r--yt/yt/core/rpc/client.cpp7
-rw-r--r--yt/yt/core/rpc/client.h5
-rw-r--r--yt/yt/core/rpc/helpers.cpp19
-rw-r--r--yt/yt/core/rpc/unittests/allocation_tags/ya.make29
-rw-r--r--yt/yt/core/rpc/unittests/bin/main.cpp4
-rw-r--r--yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp6
-rw-r--r--yt/yt/core/rpc/unittests/lib/common.h9
-rw-r--r--yt/yt/core/rpc/unittests/lib/my_service.h77
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp (renamed from yt/yt/core/rpc/unittests/lib/my_service.cpp)68
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.h78
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.proto (renamed from yt/yt/core/rpc/unittests/lib/my_service.proto)16
-rw-r--r--yt/yt/core/rpc/unittests/lib/ya.make4
-rw-r--r--yt/yt/core/rpc/unittests/roaming_channel_ut.cpp4
-rw-r--r--yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp86
-rw-r--r--yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp38
-rw-r--r--yt/yt/core/rpc/unittests/rpc_ut.cpp150
-rw-r--r--yt/yt/core/rpc/unittests/ya.make1
-rw-r--r--yt/yt/core/tracing/trace_context.cpp10
-rw-r--r--yt/yt/core/tracing/trace_context.h2
-rw-r--r--yt/yt/library/ytprof/README.md251
-rw-r--r--yt/yt/library/ytprof/build_info.cpp34
-rw-r--r--yt/yt/library/ytprof/build_info.h30
-rw-r--r--yt/yt/library/ytprof/cpu_profiler.cpp201
-rw-r--r--yt/yt/library/ytprof/cpu_profiler.h71
-rw-r--r--yt/yt/library/ytprof/external_pprof.cpp56
-rw-r--r--yt/yt/library/ytprof/external_pprof.h24
-rw-r--r--yt/yt/library/ytprof/heap_profiler.cpp261
-rw-r--r--yt/yt/library/ytprof/heap_profiler.h40
-rw-r--r--yt/yt/library/ytprof/profile.cpp34
-rw-r--r--yt/yt/library/ytprof/profile.h19
-rw-r--r--yt/yt/library/ytprof/profile.proto209
-rw-r--r--yt/yt/library/ytprof/queue-inl.h72
-rw-r--r--yt/yt/library/ytprof/queue.h45
-rw-r--r--yt/yt/library/ytprof/signal_safe_profiler.cpp370
-rw-r--r--yt/yt/library/ytprof/signal_safe_profiler.h95
-rw-r--r--yt/yt/library/ytprof/spinlock_profiler.cpp225
-rw-r--r--yt/yt/library/ytprof/spinlock_profiler.h82
-rw-r--r--yt/yt/library/ytprof/symbolize.cpp809
-rw-r--r--yt/yt/library/ytprof/symbolize.h30
-rw-r--r--yt/yt/library/ytprof/symbolize_other.cpp35
-rw-r--r--yt/yt/library/ytprof/ya.make74
42 files changed, 3462 insertions, 225 deletions
diff --git a/yt/yt/core/rpc/client-inl.h b/yt/yt/core/rpc/client-inl.h
index f90de1f7171..d2f83cbb11e 100644
--- a/yt/yt/core/rpc/client-inl.h
+++ b/yt/yt/core/rpc/client-inl.h
@@ -54,11 +54,13 @@ TFuture<typename TResponse::TResult> TTypedClientRequest<TRequestMessage, TRespo
auto context = CreateClientContext();
auto requestAttachmentsStream = context->GetRequestAttachmentsStream();
auto responseAttachmentsStream = context->GetResponseAttachmentsStream();
+
typename TResponse::TResult response;
{
- TMemoryTagGuard guard(context->GetResponseMemoryTag());
+ auto traceContextGuard = NTracing::TCurrentTraceContextGuard(context->GetTraceContext());
response = New<TResponse>(std::move(context));
}
+
auto promise = response->GetPromise();
auto requestControl = Send(std::move(response));
if (requestControl) {
@@ -126,7 +128,7 @@ void TTypedClientResponse<TResponseMessage>::SetPromise(const TError& error)
template <class TResponseMessage>
bool TTypedClientResponse<TResponseMessage>::TryDeserializeBody(TRef data, std::optional<NCompression::ECodec> codecId)
{
- TMemoryTagGuard guard(ClientContext_->GetResponseMemoryTag());
+ auto traceContextGuard = NTracing::TCurrentTraceContextGuard(ClientContext_->GetTraceContext());
return codecId
? TryDeserializeProtoWithCompression(this, data, *codecId)
@@ -149,7 +151,6 @@ TIntrusivePtr<T> TProxyBase::CreateRequest(const TMethodDescriptor& methodDescri
request->SetResponseCodec(DefaultResponseCodec_);
request->SetEnableLegacyRpcCodecs(DefaultEnableLegacyRpcCodecs_);
request->SetMultiplexingBand(methodDescriptor.MultiplexingBand);
- request->SetResponseMemoryTag(GetCurrentMemoryTag());
if (methodDescriptor.StreamingEnabled) {
request->ClientAttachmentsStreamingParameters() =
diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp
index 89621fb68e2..fb20e7a2f3d 100644
--- a/yt/yt/core/rpc/client.cpp
+++ b/yt/yt/core/rpc/client.cpp
@@ -40,8 +40,7 @@ TClientContext::TClientContext(
TFeatureIdFormatter featureIdFormatter,
bool responseIsHeavy,
TAttachmentsOutputStreamPtr requestAttachmentsStream,
- TAttachmentsInputStreamPtr responseAttachmentsStream,
- TMemoryTag responseMemoryTag)
+ TAttachmentsInputStreamPtr responseAttachmentsStream)
: RequestId_(requestId)
, TraceContext_(std::move(traceContext))
, Service_(std::move(service))
@@ -50,7 +49,6 @@ TClientContext::TClientContext(
, ResponseHeavy_(responseIsHeavy)
, RequestAttachmentsStream_(std::move(requestAttachmentsStream))
, ResponseAttachmentsStream_(std::move(responseAttachmentsStream))
- , ResponseMemoryTag_(responseMemoryTag)
{ }
////////////////////////////////////////////////////////////////////////////////
@@ -349,8 +347,7 @@ TClientContextPtr TClientRequest::CreateClientContext()
FeatureIdFormatter_,
ResponseHeavy_,
RequestAttachmentsStream_,
- ResponseAttachmentsStream_,
- GetResponseMemoryTag().value_or(GetCurrentMemoryTag()));
+ ResponseAttachmentsStream_);
}
void TClientRequest::OnPullRequestAttachmentsStream()
diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h
index dca8836b2bc..25479128d69 100644
--- a/yt/yt/core/rpc/client.h
+++ b/yt/yt/core/rpc/client.h
@@ -104,7 +104,6 @@ public:
DEFINE_BYVAL_RO_PROPERTY(bool, ResponseHeavy);
DEFINE_BYVAL_RO_PROPERTY(TAttachmentsOutputStreamPtr, RequestAttachmentsStream);
DEFINE_BYVAL_RO_PROPERTY(TAttachmentsInputStreamPtr, ResponseAttachmentsStream);
- DEFINE_BYVAL_RO_PROPERTY(TMemoryTag, ResponseMemoryTag);
public:
TClientContext(
@@ -115,8 +114,7 @@ public:
TFeatureIdFormatter featureIdFormatter,
bool heavy,
TAttachmentsOutputStreamPtr requestAttachmentsStream,
- TAttachmentsInputStreamPtr responseAttachmentsStream,
- TMemoryTag responseMemoryTag);
+ TAttachmentsInputStreamPtr responseAttachmentsStream);
};
DEFINE_REFCOUNTED_TYPE(TClientContext)
@@ -139,7 +137,6 @@ public:
DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, ResponseCodec, NCompression::ECodec::None);
DEFINE_BYVAL_RW_PROPERTY(bool, EnableLegacyRpcCodecs, true);
DEFINE_BYVAL_RW_PROPERTY(bool, GenerateAttachmentChecksums, true);
- DEFINE_BYVAL_RW_PROPERTY(std::optional<TMemoryTag>, ResponseMemoryTag);
DEFINE_BYVAL_RW_PROPERTY(IMemoryReferenceTrackerPtr, MemoryReferenceTracker);
// For testing purposes only.
DEFINE_BYVAL_RW_PROPERTY(std::optional<TDuration>, SendDelay);
diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp
index 1384439803f..a881ac49a4e 100644
--- a/yt/yt/core/rpc/helpers.cpp
+++ b/yt/yt/core/rpc/helpers.cpp
@@ -425,14 +425,23 @@ TTraceContextPtr GetOrCreateHandlerTraceContext(
TTraceContextPtr CreateCallTraceContext(std::string service, std::string method)
{
- auto context = TryGetCurrentTraceContext();
- if (!context) {
+ auto oldTraceContext = TryGetCurrentTraceContext();
+ if (!oldTraceContext) {
return nullptr;
}
- if (!context->IsRecorded()) {
- return context;
+ if (!oldTraceContext->IsRecorded()) {
+ return oldTraceContext;
}
- return context->CreateChild(ConcatToString(TStringBuf("RpcClient:"), service, TStringBuf("."), method));
+
+ auto traceContext = oldTraceContext->CreateChild(Format("RpcClient:%v.%v", service, method));
+
+ traceContext->SetAllocationTagsPtr(oldTraceContext->GetAllocationTagsPtr());
+
+ if (GetCurrentMemoryTag() && !traceContext->FindAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral)) {
+ traceContext->SetAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral, GetCurrentMemoryTag());
+ }
+
+ return traceContext;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/allocation_tags/ya.make b/yt/yt/core/rpc/unittests/allocation_tags/ya.make
new file mode 100644
index 00000000000..722b2a98721
--- /dev/null
+++ b/yt/yt/core/rpc/unittests/allocation_tags/ya.make
@@ -0,0 +1,29 @@
+GTEST(core-rpc-allocation-tags)
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+IF (OS_LINUX)
+ ALLOCATOR(TCMALLOC_256K)
+ENDIF()
+
+PROTO_NAMESPACE(yt)
+
+SRCS(
+ yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
+)
+
+INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc)
+
+PEERDIR(
+ yt/yt/library/ytprof
+ yt/yt/library/profiling
+ yt/yt/core
+ yt/yt/core/rpc/grpc
+ yt/yt/core/rpc/unittests/lib
+ yt/yt/core/test_framework
+ library/cpp/testing/common
+)
+
+SIZE(MEDIUM)
+
+END()
diff --git a/yt/yt/core/rpc/unittests/bin/main.cpp b/yt/yt/core/rpc/unittests/bin/main.cpp
index 1e7103c0409..2b3e4f2c9e7 100644
--- a/yt/yt/core/rpc/unittests/bin/main.cpp
+++ b/yt/yt/core/rpc/unittests/bin/main.cpp
@@ -6,7 +6,7 @@
#include <yt/yt/core/concurrency/thread_pool.h>
-#include <yt/yt/core/rpc/unittests/lib/my_service.h>
+#include <yt/yt/core/rpc/unittests/lib/test_service.h>
using namespace NYT;
using namespace NYT::NBus;
@@ -28,7 +28,7 @@ int main(int argc, char* argv[])
auto server = CreateBusServer(busServer);
auto workerPool = CreateThreadPool(4, "Worker");
- auto service = CreateMyService(workerPool->GetInvoker(), false, /*createChannel*/ {});
+ auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {});
server->RegisterService(service);
server->Start();
diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
index afc0d7fbb90..a033fa9f198 100644
--- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
+++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
@@ -60,7 +60,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
{
auto channel = this->CreateChannel(outerServer.GetAddress());
- TMyProxy proxy(channel);
+ TTestProxy proxy(channel);
auto req = proxy.GetChannelFailureError();
auto error = req->Invoke().Get();
ASSERT_FALSE(error.IsOK());
@@ -79,7 +79,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
ASSERT_FALSE(IsChannelFailureErrorHandled(error));
}));
- TMyProxy proxy(channel);
+ TTestProxy proxy(channel);
auto req = proxy.GetChannelFailureError();
auto error = req->Invoke().Get();
ASSERT_FALSE(error.IsOK());
@@ -99,7 +99,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
ASSERT_TRUE(IsChannelFailureErrorHandled(error));
}));
- TMyProxy proxy(channel);
+ TTestProxy proxy(channel);
auto req = proxy.GetChannelFailureError();
req->set_redirection_address(innerServer.GetAddress());
auto error = req->Invoke().Get();
diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h
index e785c71c870..1e00b584c6a 100644
--- a/yt/yt/core/rpc/unittests/lib/common.h
+++ b/yt/yt/core/rpc/unittests/lib/common.h
@@ -31,7 +31,7 @@
#include <yt/yt/core/rpc/service_detail.h>
#include <yt/yt/core/rpc/stream.h>
-#include <yt/yt/core/rpc/unittests/lib/my_service.h>
+#include <yt/yt/core/rpc/unittests/lib/test_service.h>
#include <yt/yt/core/rpc/unittests/lib/no_baggage_service.h>
#include <yt/yt/core/rpc/grpc/config.h>
@@ -74,10 +74,9 @@ public:
TTestCreateChannelCallback createChannel)
{
Server_ = server;
- MyService_ = CreateMyService(invoker, secure, createChannel);
+ TestService_ = CreateTestService(invoker, secure, createChannel);
NoBaggageService_ = CreateNoBaggageService(invoker);
-
- Server_->RegisterService(MyService_);
+ Server_->RegisterService(TestService_);
Server_->RegisterService(NoBaggageService_);
Server_->Start();
}
@@ -102,7 +101,7 @@ protected:
NTesting::TPortHolder Port_;
TString Address_;
- IMyServicePtr MyService_;
+ ITestServicePtr TestService_;
IServicePtr NoBaggageService_;
IServerPtr Server_;
};
diff --git a/yt/yt/core/rpc/unittests/lib/my_service.h b/yt/yt/core/rpc/unittests/lib/my_service.h
deleted file mode 100644
index 69f3fa9c7b3..00000000000
--- a/yt/yt/core/rpc/unittests/lib/my_service.h
+++ /dev/null
@@ -1,77 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/rpc/client.h>
-#include <yt/yt/core/rpc/service.h>
-
-#include <yt/yt/core/rpc/unittests/lib/my_service.pb.h>
-
-namespace NYT::NRpc {
-
-////////////////////////////////////////////////////////////////////////////////
-
-DEFINE_ENUM(EMyFeature,
- ((Cool) (0))
- ((Great) (1))
-);
-
-class TMyProxy
- : public TProxyBase
-{
-public:
- DEFINE_RPC_PROXY(TMyProxy, MyService,
- .SetProtocolVersion(1)
- .SetFeaturesType<EMyFeature>());
-
- DEFINE_RPC_PROXY_METHOD(NMyRpc, SomeCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, PassCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, RegularAttachments);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, NullAndEmptyAttachments);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, Compression);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, DoNothing);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, CustomMessageError);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, NotRegistered);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, SlowCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, SlowCanceledCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, NoReply);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, FlakyCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, RequireCoolFeature);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, RequestBytesThrottledCall);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, StreamingEcho,
- .SetStreamingEnabled(true));
- DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerStreamsAborted,
- .SetStreamingEnabled(true));
- DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerNotReading,
- .SetStreamingEnabled(true));
- DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerNotWriting,
- .SetStreamingEnabled(true));
- DEFINE_RPC_PROXY_METHOD(NMyRpc, GetTraceBaggage);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, CustomMetadata);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, GetChannelFailureError);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-DECLARE_REFCOUNTED_CLASS(IMyService)
-
-class IMyService
- : public virtual IService
-{
-public:
- virtual TFuture<void> GetSlowCallCanceled() const = 0;
- virtual TFuture<void> GetServerStreamsAborted() const = 0;
-};
-
-DEFINE_REFCOUNTED_TYPE(IMyService)
-
-////////////////////////////////////////////////////////////////////////////////
-
-using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address)>;
-
-IMyServicePtr CreateMyService(
- IInvokerPtr invoker,
- bool secure,
- TTestCreateChannelCallback createChannel);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/unittests/lib/my_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index 0503cfc5225..fdc5354ca6f 100644
--- a/yt/yt/core/rpc/unittests/lib/my_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -1,4 +1,4 @@
-#include "my_service.h"
+#include "test_service.h"
#include <gtest/gtest.h>
@@ -20,24 +20,25 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
-class TMyService
- : public IMyService
+class TTestService
+ : public ITestService
, public TServiceBase
{
public:
- TMyService(
+ TTestService(
IInvokerPtr invoker,
bool secure,
TTestCreateChannelCallback createChannel)
: TServiceBase(
invoker,
- TMyProxy::GetDescriptor(),
+ TTestProxy::GetDescriptor(),
NLogging::TLogger("Main"))
, Secure_(secure)
, CreateChannel_(createChannel)
{
RegisterMethod(RPC_SERVICE_METHOD_DESC(SomeCall));
RegisterMethod(RPC_SERVICE_METHOD_DESC(PassCall));
+ RegisterMethod(RPC_SERVICE_METHOD_DESC(AllocationCall));
RegisterMethod(RPC_SERVICE_METHOD_DESC(RegularAttachments));
RegisterMethod(RPC_SERVICE_METHOD_DESC(NullAndEmptyAttachments));
RegisterMethod(RPC_SERVICE_METHOD_DESC(Compression));
@@ -70,10 +71,10 @@ public:
RegisterMethod(RPC_SERVICE_METHOD_DESC(GetChannelFailureError));
// NB: NotRegisteredCall is not registered intentionally
- DeclareServerFeature(EMyFeature::Great);
+ DeclareServerFeature(ETestFeature::Great);
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, SomeCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, SomeCall)
{
context->SetRequestInfo();
int a = request->a();
@@ -81,7 +82,7 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, PassCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, PassCall)
{
context->SetRequestInfo();
WriteAuthenticationIdentityToProto(response, context->GetAuthenticationIdentity());
@@ -90,7 +91,14 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, RegularAttachments)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, AllocationCall)
+ {
+ context->SetRequestInfo();
+ response->set_allocated_string(TString("r", request->size()));
+ context->Reply();
+ }
+
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, RegularAttachments)
{
for (const auto& attachment : request->Attachments()) {
auto data = TBlob();
@@ -101,7 +109,7 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, NullAndEmptyAttachments)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, NullAndEmptyAttachments)
{
const auto& attachments = request->Attachments();
EXPECT_EQ(2u, attachments.size());
@@ -112,7 +120,7 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, Compression)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, Compression)
{
auto requestCodecId = CheckedEnumCast<NCompression::ECodec>(request->request_codec());
auto serializedRequestBody = SerializeProtoToRefWithCompression(*request, requestCodecId);
@@ -133,26 +141,26 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, DoNothing)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, DoNothing)
{
context->SetRequestInfo();
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, CustomMessageError)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, CustomMessageError)
{
context->SetRequestInfo();
context->Reply(TError(NYT::EErrorCode(42), "Some Error"));
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, SlowCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCall)
{
context->SetRequestInfo();
TDelayedExecutor::WaitForDuration(TDuration::Seconds(1));
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, SlowCanceledCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCanceledCall)
{
try {
context->SetRequestInfo();
@@ -169,15 +177,15 @@ public:
return SlowCallCanceled_.ToFuture();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, RequestBytesThrottledCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequestBytesThrottledCall)
{
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, NoReply)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, NoReply)
{ }
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, StreamingEcho)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, StreamingEcho)
{
context->SetRequestInfo();
@@ -215,7 +223,7 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerStreamsAborted)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerStreamsAborted)
{
context->SetRequestInfo();
@@ -244,7 +252,7 @@ public:
ServerStreamsAborted_.Set();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerNotReading)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerNotReading)
{
context->SetRequestInfo();
@@ -266,7 +274,7 @@ public:
}
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerNotWriting)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerNotWriting)
{
context->SetRequestInfo();
@@ -289,7 +297,7 @@ public:
}
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, FlakyCall)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, FlakyCall)
{
static std::atomic<int> callCount;
@@ -302,14 +310,14 @@ public:
}
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, RequireCoolFeature)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequireCoolFeature)
{
context->SetRequestInfo();
- context->ValidateClientFeature(EMyFeature::Cool);
+ context->ValidateClientFeature(ETestFeature::Cool);
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, GetTraceBaggage)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, GetTraceBaggage)
{
context->SetRequestInfo();
auto* traceContext = NTracing::TryGetCurrentTraceContext();
@@ -317,7 +325,7 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, CustomMetadata)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, CustomMetadata)
{
auto customMetadataExt = context->GetRequestHeader().GetExtension(NRpc::NProto::TCustomMetadataExt::custom_metadata_ext);
for (const auto& [key, value] : customMetadataExt.entries()) {
@@ -326,13 +334,13 @@ public:
context->Reply();
}
- DECLARE_RPC_SERVICE_METHOD(NMyRpc, GetChannelFailureError)
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, GetChannelFailureError)
{
context->SetRequestInfo();
if (request->has_redirection_address()) {
YT_VERIFY(CreateChannel_);
- TMyProxy proxy(CreateChannel_(request->redirection_address()));
+ TTestProxy proxy(CreateChannel_(request->redirection_address()));
auto req = proxy.GetChannelFailureError();
context->ReplyFrom(req->Invoke().AsVoid());
} else {
@@ -367,12 +375,12 @@ private:
////////////////////////////////////////////////////////////////////////////////
-IMyServicePtr CreateMyService(
+ITestServicePtr CreateTestService(
IInvokerPtr invoker,
bool secure,
TTestCreateChannelCallback createChannel)
{
- return New<TMyService>(invoker, secure, createChannel);
+ return New<TTestService>(invoker, secure, createChannel);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h
new file mode 100644
index 00000000000..f03be1a1153
--- /dev/null
+++ b/yt/yt/core/rpc/unittests/lib/test_service.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <yt/yt/core/rpc/client.h>
+#include <yt/yt/core/rpc/service.h>
+
+#include <yt/yt/core/rpc/unittests/lib/test_service.pb.h>
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(ETestFeature,
+ ((Cool) (0))
+ ((Great) (1))
+);
+
+class TTestProxy
+ : public TProxyBase
+{
+public:
+ DEFINE_RPC_PROXY(TTestProxy, TestService,
+ .SetProtocolVersion(1)
+ .SetFeaturesType<ETestFeature>());
+
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, SomeCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, PassCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, AllocationCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, RegularAttachments);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, NullAndEmptyAttachments);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, Compression);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, DoNothing);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, CustomMessageError);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, NotRegistered);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCanceledCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, NoReply);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, FlakyCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, RequireCoolFeature);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, RequestBytesThrottledCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, StreamingEcho,
+ .SetStreamingEnabled(true));
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerStreamsAborted,
+ .SetStreamingEnabled(true));
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerNotReading,
+ .SetStreamingEnabled(true));
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerNotWriting,
+ .SetStreamingEnabled(true));
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, GetTraceBaggage);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, CustomMetadata);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, GetChannelFailureError);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_CLASS(ITestService)
+
+class ITestService
+ : public virtual IService
+{
+public:
+ virtual TFuture<void> GetSlowCallCanceled() const = 0;
+ virtual TFuture<void> GetServerStreamsAborted() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(ITestService)
+
+////////////////////////////////////////////////////////////////////////////////
+
+using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address)>;
+
+ITestServicePtr CreateTestService(
+ IInvokerPtr invoker,
+ bool secure,
+ TTestCreateChannelCallback createChannel);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/unittests/lib/my_service.proto b/yt/yt/core/rpc/unittests/lib/test_service.proto
index 5997d917d22..ebb62f4eb55 100644
--- a/yt/yt/core/rpc/unittests/lib/my_service.proto
+++ b/yt/yt/core/rpc/unittests/lib/test_service.proto
@@ -1,8 +1,8 @@
-package NMyRpc;
+package NTestRpc;
import "yt_proto/yt/core/misc/proto/guid.proto";
-option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/rpc/unittests;myservice";
+option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/rpc/unittests;testservice";
////////////////////////////////////////////////////////////////////////////////
@@ -33,6 +33,18 @@ message TRspPassCall
////////////////////////////////////////////////////////////////////////////////
+message TReqAllocationCall
+{
+ required int64 size = 1;
+}
+
+message TRspAllocationCall
+{
+ optional bytes allocated_string = 1;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
message TReqRegularAttachments
{
}
diff --git a/yt/yt/core/rpc/unittests/lib/ya.make b/yt/yt/core/rpc/unittests/lib/ya.make
index a2186cf413c..68e2961b799 100644
--- a/yt/yt/core/rpc/unittests/lib/ya.make
+++ b/yt/yt/core/rpc/unittests/lib/ya.make
@@ -4,8 +4,8 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
common.cpp
- my_service.cpp
- my_service.proto
+ test_service.cpp
+ test_service.proto
no_baggage_service.cpp
no_baggage_service.proto
)
diff --git a/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp b/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp
index ffe6a075772..5c1bb6024c6 100644
--- a/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp
+++ b/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp
@@ -147,7 +147,7 @@ TYPED_TEST(TRpcTest, RoamingChannelNever)
{
auto channel = CreateRoamingChannel(New<TOneChannelProvider>(CreateRoamingChannel(New<TNeverProvider>())));
- TMyProxy proxy(std::move(channel));
+ TTestProxy proxy(std::move(channel));
auto req = proxy.SomeCall();
req->set_a(42);
@@ -167,7 +167,7 @@ TYPED_TEST(TRpcTest, RoamingChannelManual)
auto manualProviderWeak = MakeWeak(manualProvider);
manualProvider.Reset();
- TMyProxy proxy(std::move(channel));
+ TTestProxy proxy(std::move(channel));
auto req = proxy.SomeCall();
req->set_a(42);
auto asyncRspOrError = req->Invoke()
diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
new file mode 100644
index 00000000000..b78d4aa79e2
--- /dev/null
+++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
@@ -0,0 +1,86 @@
+#include <yt/yt/core/rpc/unittests/lib/common.h>
+
+#include <yt/yt/library/ytprof/heap_profiler.h>
+
+#if defined(_linux_)
+#include <tcmalloc/common.h>
+#endif
+
+namespace NYT::NRpc {
+namespace {
+
+using namespace NTracing;
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if !defined(_asan_enabled_) && !defined(_msan_enabled_) && defined(_linux_)
+
+template <class TImpl>
+using TRpcTest = TTestBase<TImpl>;
+TYPED_TEST_SUITE(TRpcTest, TAllTransports);
+
+TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
+{
+ static TMemoryTag testMemoryTag = 1 << 20;
+ testMemoryTag++;
+
+ NYTProf::EnableMemoryProfilingTags();
+
+ auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag);
+
+ auto actionQueue = New<TActionQueue>();
+
+ using TRspPtr = typename TTestProxy::TRspAllocationCallPtr;
+ std::vector<TFuture<TRspPtr>> responses;
+
+ TTestProxy proxy(this->CreateChannel());
+
+ constexpr auto size = 1_MB;
+ for (int i = 0; i < 10; ++i) {
+ auto context = CreateTraceContextFromCurrent("ResponseWithAllocationTags");
+ auto contextGuard = TTraceContextGuard(context);
+ context->SetAllocationTag(MemoryTagLiteral, testMemoryTag);
+
+ auto req1 = proxy.AllocationCall();
+ req1->set_size(size);
+
+ auto rspFutureNoProp = req1->Invoke()
+ .Apply(BIND_NO_PROPAGATE([] (const TRspPtr& res) {
+ EXPECT_EQ(TryGetCurrentTraceContext(), nullptr);
+ return res;
+ }).AsyncVia(actionQueue->GetInvoker()));
+ responses.push_back(rspFutureNoProp);
+
+ auto req2 = proxy.AllocationCall();
+ req2->set_size(size);
+
+ auto rspFutureProp = req2->Invoke()
+ .Apply(BIND([testMemoryTag=testMemoryTag] (const TRspPtr& res) {
+ auto localContext = TryGetCurrentTraceContext();
+ EXPECT_NE(localContext, nullptr);
+ if (localContext) {
+ EXPECT_EQ(localContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral).value_or(NullMemoryTag), testMemoryTag);
+ }
+ return res;
+ }).AsyncVia(actionQueue->GetInvoker()));
+ responses.push_back(rspFutureProp);
+ }
+
+ for (auto& rsp : responses) {
+ WaitFor(rsp).ValueOrThrow();
+ }
+
+ auto memoryUsageAfter = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag];
+ auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage;
+ EXPECT_GE(deltaMemoryUsage, 14_MB)
+ << "InitialUsage: " << initialMemoryUsage << std::endl
+ << "After waiting: " << memoryUsageAfter;
+}
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp b/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp
index 2465097b0d1..8691dc4e9a5 100644
--- a/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp
@@ -19,32 +19,50 @@ Y_TEST_HOOK_BEFORE_RUN(GTEST_YT_RPC_SHUTDOWN)
////////////////////////////////////////////////////////////////////////////////
-template <class TMyProxy>
+template <class TTestProxy>
void TestShutdown(const IChannelPtr& channel)
{
- TMyProxy proxy(channel);
+ TTestProxy proxy(channel);
- std::vector<NYT::TFuture<typename TTypedClientResponse<NMyRpc::TRspSomeCall>::TResult>> futures;
- futures.reserve(100000);
- for (int i = 0; i < 100000; ++i) {
+ constexpr int numberOfRequests = 1000;
+ std::vector<TFuture<typename TTypedClientResponse<NTestRpc::TRspSomeCall>::TResult>> futures;
+ futures.reserve(numberOfRequests);
+
+ for (int i = 0; i < numberOfRequests; ++i) {
auto req = proxy.SomeCall();
req->SetTimeout(TDuration::Seconds(1));
req->set_a(42);
futures.push_back(req->Invoke());
}
- NYT::Shutdown();
+ Shutdown(TShutdownOptions{TDuration::Seconds(30), true, 127});
+
+ if (!NYT::IsShutdownStarted()) {
+ Cerr << "Shutdown was not started" << Endl;
+ _exit(2);
+ }
+
+ TFileInput shutdownLogInput((GetOutputPath() / "shutdown.log").GetPath());
+ TString buffer;
+ int exitCode = 1;
+
+ while (shutdownLogInput.ReadLine(buffer)) {
+ Cerr << buffer << Endl;
+ if (exitCode && buffer == "*** Shutdown completed") {
+ exitCode = 0;
+ }
+ }
- for (auto& future : futures) {
- future.Cancel(TError{});
+ if (exitCode) {
+ Cerr << "Shutdown was NOT completed" << Endl;
}
- _exit(0);
+ _exit(exitCode);
}
TYPED_TEST(TRpcShutdownTest, Shutdown)
{
- EXPECT_EXIT(TestShutdown<TMyProxy>(this->CreateChannel()), testing::ExitedWithCode(0), "");
+ EXPECT_EXIT(TestShutdown<TTestProxy>(this->CreateChannel()), ::testing::ExitedWithCode(0), "");
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp
index f7ff93048a1..63148dadc91 100644
--- a/yt/yt/core/rpc/unittests/rpc_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp
@@ -18,17 +18,17 @@ class TNonExistingServiceProxy
public:
DEFINE_RPC_PROXY(TNonExistingServiceProxy, NonExistingService);
- DEFINE_RPC_PROXY_METHOD(NMyRpc, DoNothing);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, DoNothing);
};
-class TMyIncorrectProtocolVersionProxy
+class TTestIncorrectProtocolVersionProxy
: public TProxyBase
{
public:
- DEFINE_RPC_PROXY(TMyIncorrectProtocolVersionProxy, MyService,
+ DEFINE_RPC_PROXY(TTestIncorrectProtocolVersionProxy, TestService,
.SetProtocolVersion(2));
- DEFINE_RPC_PROXY_METHOD(NMyRpc, SomeCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, SomeCall);
};
////////////////////////////////////////////////////////////////////////////////
@@ -57,7 +57,7 @@ TYPED_TEST_SUITE(TGrpcTest, TGrpcOnly);
TYPED_TEST(TRpcTest, Send)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SomeCall();
req->set_a(42);
auto rspOrError = req->Invoke().Get();
@@ -77,7 +77,7 @@ TYPED_TEST(TRpcTest, RetryingSend)
this->CreateChannel());
{
- TMyProxy proxy(channel);
+ TTestProxy proxy(channel);
auto req = proxy.FlakyCall();
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError);
@@ -92,7 +92,7 @@ TYPED_TEST(TRpcTest, RetryingSend)
TYPED_TEST(TRpcTest, UserTag)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.PassCall();
req->SetUser("test-user");
req->SetUserTag("test-user-tag");
@@ -108,7 +108,7 @@ TYPED_TEST(TRpcTest, UserTag)
TYPED_TEST(TNotUdsTest, Address)
{
auto testChannel = [] (IChannelPtr channel) {
- TMyProxy proxy(std::move(channel));
+ TTestProxy proxy(std::move(channel));
auto req = proxy.SomeCall();
req->set_a(42);
auto rspOrError = req->Invoke().Get();
@@ -133,7 +133,7 @@ TYPED_TEST(TNotUdsTest, Address)
TYPED_TEST(TNotGrpcTest, SendSimple)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.PassCall();
req->SetUser("test-user");
req->SetMutationId(TGuid::Create());
@@ -149,7 +149,7 @@ TYPED_TEST(TNotGrpcTest, SendSimple)
TYPED_TEST(TNotGrpcTest, StreamingEcho)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultRequestCodec(NCompression::ECodec::Lz4);
proxy.SetDefaultResponseCodec(NCompression::ECodec::Zstd_1);
proxy.SetDefaultEnableLegacyRpcCodecs(false);
@@ -223,7 +223,7 @@ TYPED_TEST(TNotGrpcTest, StreamingEcho)
TYPED_TEST(TNotGrpcTest, ClientStreamsAborted)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.StreamingEcho();
req->SetTimeout(TDuration::MilliSeconds(100));
@@ -243,20 +243,20 @@ TYPED_TEST(TNotGrpcTest, ClientStreamsAborted)
TYPED_TEST(TNotGrpcTest, ServerStreamsAborted)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.ServerStreamsAborted();
req->SetTimeout(TDuration::MilliSeconds(100));
auto rspOrError = WaitFor(req->Invoke());
EXPECT_EQ(NYT::EErrorCode::Timeout, rspOrError.GetCode());
- WaitFor(this->MyService_->GetServerStreamsAborted())
+ WaitFor(this->TestService_->GetServerStreamsAborted())
.ThrowOnError();
}
TYPED_TEST(TNotGrpcTest, ClientNotReading)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultServerAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(250);
for (auto sleep : {false, true}) {
@@ -287,7 +287,7 @@ TYPED_TEST(TNotGrpcTest, ClientNotReading)
TYPED_TEST(TNotGrpcTest, ClientNotWriting)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(250);
for (auto sleep : {false, true}) {
@@ -318,7 +318,7 @@ TYPED_TEST(TNotGrpcTest, ClientNotWriting)
TYPED_TEST(TNotGrpcTest, ServerNotReading)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultClientAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(250);
for (auto sleep : {false, true}) {
@@ -339,13 +339,13 @@ TYPED_TEST(TNotGrpcTest, ServerNotReading)
EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode());
}
- WaitFor(this->MyService_->GetSlowCallCanceled())
+ WaitFor(this->TestService_->GetSlowCallCanceled())
.ThrowOnError();
}
TYPED_TEST(TNotGrpcTest, ServerNotWriting)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultClientAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(250);
for (auto sleep : {false, true}) {
@@ -365,13 +365,13 @@ TYPED_TEST(TNotGrpcTest, ServerNotWriting)
EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode());
}
- WaitFor(this->MyService_->GetSlowCallCanceled())
+ WaitFor(this->TestService_->GetSlowCallCanceled())
.ThrowOnError();
}
TYPED_TEST(TNotGrpcTest, LaggyStreamingRequest)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500);
proxy.DefaultClientAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(500);
@@ -394,7 +394,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest)
{
auto configText = TString(R"({
services = {
- MyService = {
+ TestService = {
pending_payloads_timeout = 250;
};
};
@@ -402,7 +402,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest)
auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText));
this->Server_->Configure(config);
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500);
auto start = Now();
@@ -437,7 +437,7 @@ TYPED_TEST(TNotGrpcTest, TraceBaggagePropagation)
baggage->Set("key2", "value2");
traceContext->PackBaggage(ConvertToAttributes(baggage));
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.GetTraceBaggage();
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK());
@@ -472,12 +472,12 @@ TYPED_TEST(TRpcTest, ManyAsyncRequests)
std::vector<TFuture<void>> asyncResults;
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
for (int i = 0; i < RequestCount; ++i) {
auto request = proxy.SomeCall();
request->set_a(i);
- auto asyncResult = request->Invoke().Apply(BIND([=] (TMyProxy::TRspSomeCallPtr rsp) {
+ auto asyncResult = request->Invoke().Apply(BIND([=] (TTestProxy::TRspSomeCallPtr rsp) {
EXPECT_EQ(i + 100, rsp->b());
}));
asyncResults.push_back(asyncResult);
@@ -488,12 +488,12 @@ TYPED_TEST(TRpcTest, ManyAsyncRequests)
TYPED_TEST(TRpcTest, RegularAttachments)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.RegularAttachments();
req->Attachments().push_back(TSharedRef::FromString("Hello"));
req->Attachments().push_back(TSharedRef::FromString("from"));
- req->Attachments().push_back(TSharedRef::FromString("TMyProxy"));
+ req->Attachments().push_back(TSharedRef::FromString("TTestProxy"));
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK());
@@ -503,12 +503,12 @@ TYPED_TEST(TRpcTest, RegularAttachments)
EXPECT_EQ(3u, attachments.size());
EXPECT_EQ("Hello_", StringFromSharedRef(attachments[0]));
EXPECT_EQ("from_", StringFromSharedRef(attachments[1]));
- EXPECT_EQ("TMyProxy_", StringFromSharedRef(attachments[2]));
+ EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2]));
}
TYPED_TEST(TRpcTest, NullAndEmptyAttachments)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.NullAndEmptyAttachments();
req->Attachments().push_back(TSharedRef());
@@ -537,7 +537,7 @@ TYPED_TEST(TNotGrpcTest, Compression)
"According to all known laws of aviation, there is no way that a bee should be able to fly."
});
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultRequestCodec(requestCodecId);
proxy.SetDefaultResponseCodec(responseCodecId);
proxy.SetDefaultEnableLegacyRpcCodecs(false);
@@ -579,16 +579,16 @@ TYPED_TEST(TRpcTest, ResponseMemoryTag)
testMemoryTag++;
auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag);
- std::vector<TMyProxy::TRspPassCallPtr> rsps;
+ std::vector<TTestProxy::TRspPassCallPtr> rsps;
{
- TMyProxy proxy(this->CreateChannel());
- TString longString(100, 'a');
+ TTestProxy proxy(this->CreateChannel());
+ TString userName("user");
TMemoryTagGuard guard(testMemoryTag);
- for (int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 1000; ++i) {
auto req = proxy.PassCall();
- req->SetUser(longString);
+ req->SetUser(userName);
req->SetMutationId(TGuid::Create());
req->SetRetry(false);
auto err = req->Invoke().Get();
@@ -597,7 +597,7 @@ TYPED_TEST(TRpcTest, ResponseMemoryTag)
}
auto currentMemoryUsage = GetMemoryUsageForTag(testMemoryTag);
- EXPECT_GE(currentMemoryUsage - initialMemoryUsage, 500'000u)
+ EXPECT_GE(currentMemoryUsage - initialMemoryUsage, 256_KB)
<< "InitialUsage: " << initialMemoryUsage << std::endl
<< "Current: " << currentMemoryUsage;
}
@@ -608,7 +608,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
{
auto configText = TString(R"({
services = {
- MyService = {
+ TestService = {
methods = {
RequestBytesThrottledCall = {
request_bytes_throttler = {
@@ -622,7 +622,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText));
this->Server_->Configure(config);
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto makeCall = [&] {
auto req = proxy.RequestBytesThrottledCall();
@@ -644,7 +644,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
TYPED_TEST(TRpcTest, OK)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.DoNothing();
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK());
@@ -652,7 +652,7 @@ TYPED_TEST(TRpcTest, OK)
TYPED_TEST(TRpcTest, NoAck)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.DoNothing();
req->SetAcknowledgementTimeout(std::nullopt);
auto rspOrError = req->Invoke().Get();
@@ -661,7 +661,7 @@ TYPED_TEST(TRpcTest, NoAck)
TYPED_TEST(TRpcTest, TransportError)
{
- TMyProxy proxy(this->CreateChannel("localhost:9999"));
+ TTestProxy proxy(this->CreateChannel("localhost:9999"));
auto req = proxy.DoNothing();
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode());
@@ -677,7 +677,7 @@ TYPED_TEST(TRpcTest, NoService)
TYPED_TEST(TRpcTest, NoMethod)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.NotRegistered();
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NRpc::EErrorCode::NoSuchMethod, rspOrError.GetCode());
@@ -686,7 +686,7 @@ TYPED_TEST(TRpcTest, NoMethod)
// NB: Realms are not supported in RPC over GRPC.
TYPED_TEST(TNotGrpcTest, NoSuchRealm)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.DoNothing();
ToProto(req->Header().mutable_realm_id(), TGuid::FromString("1-2-3-4"));
auto rspOrError = req->Invoke().Get();
@@ -696,7 +696,7 @@ TYPED_TEST(TNotGrpcTest, NoSuchRealm)
TYPED_TEST(TRpcTest, ClientTimeout)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultTimeout(TDuration::Seconds(0.5));
auto req = proxy.SlowCall();
auto rspOrError = req->Invoke().Get();
@@ -705,18 +705,18 @@ TYPED_TEST(TRpcTest, ClientTimeout)
TYPED_TEST(TRpcTest, ServerTimeout)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultTimeout(TDuration::Seconds(0.5));
auto req = proxy.SlowCanceledCall();
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(this->CheckTimeoutCode(rspOrError.GetCode()));
- WaitFor(this->MyService_->GetSlowCallCanceled())
+ WaitFor(this->TestService_->GetSlowCallCanceled())
.ThrowOnError();
}
TYPED_TEST(TRpcTest, ClientCancel)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCanceledCall();
auto asyncRspOrError = req->Invoke();
Sleep(TDuration::Seconds(0.5));
@@ -726,13 +726,13 @@ TYPED_TEST(TRpcTest, ClientCancel)
EXPECT_TRUE(asyncRspOrError.IsSet());
auto rspOrError = asyncRspOrError.Get();
EXPECT_TRUE(this->CheckCancelCode(rspOrError.GetCode()));
- WaitFor(this->MyService_->GetSlowCallCanceled())
+ WaitFor(this->TestService_->GetSlowCallCanceled())
.ThrowOnError();
}
TYPED_TEST(TRpcTest, SlowCall)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultTimeout(TDuration::Seconds(2.0));
auto req = proxy.SlowCall();
auto rspOrError = req->Invoke().Get();
@@ -741,7 +741,7 @@ TYPED_TEST(TRpcTest, SlowCall)
TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
std::vector<TFuture<void>> futures;
for (int i = 0; i < 30; ++i) {
auto req = proxy.SlowCall();
@@ -757,7 +757,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
TYPED_TEST(TRpcTest, ConcurrencyLimit)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
std::vector<TFuture<void>> futures;
for (int i = 0; i < 10; ++i) {
auto req = proxy.SlowCall();
@@ -782,7 +782,7 @@ TYPED_TEST(TRpcTest, ConcurrencyLimit)
TYPED_TEST(TRpcTest, NoReply)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.NoReply();
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NRpc::EErrorCode::Unavailable, rspOrError.GetCode());
@@ -790,7 +790,7 @@ TYPED_TEST(TRpcTest, NoReply)
TYPED_TEST(TRpcTest, CustomErrorMessage)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.CustomMessageError();
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NYT::EErrorCode(42), rspOrError.GetCode());
@@ -799,7 +799,7 @@ TYPED_TEST(TRpcTest, CustomErrorMessage)
TYPED_TEST(TRpcTest, ConnectionLost)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCanceledCall();
auto asyncRspOrError = req->Invoke();
@@ -814,13 +814,13 @@ TYPED_TEST(TRpcTest, ConnectionLost)
EXPECT_TRUE(asyncRspOrError.IsSet());
auto rspOrError = asyncRspOrError.Get();
EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode());
- WaitFor(this->MyService_->GetSlowCallCanceled())
+ WaitFor(this->TestService_->GetSlowCallCanceled())
.ThrowOnError();
}
TYPED_TEST(TNotGrpcTest, ProtocolVersionMismatch)
{
- TMyIncorrectProtocolVersionProxy proxy(this->CreateChannel());
+ TTestIncorrectProtocolVersionProxy proxy(this->CreateChannel());
auto req = proxy.SomeCall();
req->set_a(42);
auto rspOrError = req->Invoke().Get();
@@ -829,57 +829,57 @@ TYPED_TEST(TNotGrpcTest, ProtocolVersionMismatch)
TYPED_TEST(TNotGrpcTest, RequiredServerFeatureSupported)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.PassCall();
- req->RequireServerFeature(EMyFeature::Great);
+ req->RequireServerFeature(ETestFeature::Great);
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError);
}
TYPED_TEST(TNotGrpcTest, RequiredServerFeatureNotSupported)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.PassCall();
- req->RequireServerFeature(EMyFeature::Cool);
+ req->RequireServerFeature(ETestFeature::Cool);
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NRpc::EErrorCode::UnsupportedServerFeature, rspOrError.GetCode());
- EXPECT_EQ(static_cast<int>(EMyFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey));
- EXPECT_EQ(ToString(EMyFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey));
+ EXPECT_EQ(static_cast<int>(ETestFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey));
+ EXPECT_EQ(ToString(ETestFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey));
}
TYPED_TEST(TNotGrpcTest, RequiredClientFeatureSupported)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.RequireCoolFeature();
- req->DeclareClientFeature(EMyFeature::Cool);
+ req->DeclareClientFeature(ETestFeature::Cool);
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError);
}
TYPED_TEST(TNotGrpcTest, RequiredClientFeatureNotSupported)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.RequireCoolFeature();
- req->DeclareClientFeature(EMyFeature::Great);
+ req->DeclareClientFeature(ETestFeature::Great);
auto rspOrError = req->Invoke().Get();
EXPECT_EQ(NRpc::EErrorCode::UnsupportedClientFeature, rspOrError.GetCode());
- EXPECT_EQ(static_cast<int>(EMyFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey));
- EXPECT_EQ(ToString(EMyFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey));
+ EXPECT_EQ(static_cast<int>(ETestFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey));
+ EXPECT_EQ(ToString(ETestFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey));
}
TYPED_TEST(TRpcTest, StopWithoutActiveRequests)
{
- auto stopResult = this->MyService_->Stop();
+ auto stopResult = this->TestService_->Stop();
EXPECT_TRUE(stopResult.IsSet());
}
TYPED_TEST(TRpcTest, StopWithActiveRequests)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCall();
auto reqResult = req->Invoke();
Sleep(TDuration::Seconds(0.5));
- auto stopResult = this->MyService_->Stop();
+ auto stopResult = this->TestService_->Stop();
EXPECT_FALSE(stopResult.IsSet());
EXPECT_TRUE(reqResult.Get().IsOK());
Sleep(TDuration::Seconds(0.5));
@@ -888,9 +888,9 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests)
TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop)
{
- auto stopResult = this->MyService_->Stop();
+ auto stopResult = this->TestService_->Stop();
EXPECT_TRUE(stopResult.IsSet());
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCall();
auto reqResult = req->Invoke();
EXPECT_FALSE(reqResult.Get().IsOK());
@@ -898,7 +898,7 @@ TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop)
TYPED_TEST(TRpcTest, CustomMetadata)
{
- TMyProxy proxy(this->CreateChannel());
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.CustomMetadata();
NYT::NRpc::NProto::TCustomMetadataExt customMetadataExt;
(*customMetadataExt.mutable_entries())["key1"] = "value1";
@@ -915,7 +915,7 @@ TYPED_TEST(TGrpcTest, SendMessageLimit)
{
THashMap<TString, NYTree::INodePtr> arguments;
arguments["grpc.max_send_message_length"] = NYT::NYTree::ConvertToNode(1);
- TMyProxy proxy(this->CreateChannel(std::nullopt, std::move(arguments)));
+ TTestProxy proxy(this->CreateChannel(std::nullopt, std::move(arguments)));
auto req = proxy.SomeCall();
req->set_a(42);
auto error = req->Invoke().Get();
diff --git a/yt/yt/core/rpc/unittests/ya.make b/yt/yt/core/rpc/unittests/ya.make
index cbaa5e58c6d..9107d2628cf 100644
--- a/yt/yt/core/rpc/unittests/ya.make
+++ b/yt/yt/core/rpc/unittests/ya.make
@@ -7,4 +7,5 @@ RECURSE(
RECURSE_FOR_TESTS(
main
shutdown
+ allocation_tags
)
diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp
index bb88ff60f80..7aedfb5dfec 100644
--- a/yt/yt/core/tracing/trace_context.cpp
+++ b/yt/yt/core/tracing/trace_context.cpp
@@ -304,6 +304,16 @@ TAllocationTagsPtr TTraceContext::GetAllocationTagsPtr() const noexcept
return AllocationTags_;
}
+void TTraceContext::SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept
+{
+ auto writerGuard = WriterGuard(AllocationTagsLock_);
+
+ // Local guard for setting RefCounted AllocationTags_.
+ auto guard = Guard(AllocationTagsAsRefCountedLock_);
+
+ AllocationTags_ = std::move(allocationTags);
+}
+
void TTraceContext::DoSetAllocationTags(TAllocationTags::TTags&& tags)
{
VERIFY_SPINLOCK_AFFINITY(AllocationTagsLock_);
diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h
index 0a6d6ae6cba..1c83c50b3b1 100644
--- a/yt/yt/core/tracing/trace_context.h
+++ b/yt/yt/core/tracing/trace_context.h
@@ -126,6 +126,8 @@ public:
TAllocationTagsPtr GetAllocationTagsPtr() const noexcept;
+ void SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept;
+
void ClearAllocationTagsPtr() noexcept;
template <typename TTag>
diff --git a/yt/yt/library/ytprof/README.md b/yt/yt/library/ytprof/README.md
new file mode 100644
index 00000000000..b676b29dfb1
--- /dev/null
+++ b/yt/yt/library/ytprof/README.md
@@ -0,0 +1,251 @@
+# ytprof
+
+Библиотека YTProf реализует набор семплирующих профайлеров совместимых с инструментом визуализации [pprof](https://github.com/google/pprof/blob/master/doc/README.md).
+
+Профайлер линкуется в исполняемый файл и запускает HTTP сервер на дебажном порту. HTTP вызовы отдают результат профилирования в формате pprof (сжатый gzip протобуф из [profiler.proto](https://a.yandex-team.ru/arc/trunk/arcadia/yt/yt/library/ytprof/profile.proto)).
+
+Файл с профилем является self contained, содержит внутри себя всю информацию о символах и номерах строк и не требует оригинального бинаря для просмотра.
+
+Для удобного просмотра исходного кода, нужно запускать pprof из корня аркадии с опцией `-trim_path='/-S/:/-B/'`.
+
+## Quick Start
+
+Работа с профайлером, на примере [example/main.cpp](https://a.yandex-team.ru/arc/trunk/arcadia/yt/yt/library/ytprof/example/main.cpp)
+
+(*) Если `ya` запускается через враппер `/usr/local/bin/ya`, то в `ya tool pprof` не работает интерактивный режим. Лучше удалить `/usr/local/bin/ya` со своей машины и использовать другой способ запуска.
+
+```
+# Все команды выполняются из корня аркадии
+
+# Собираем пример
+ya make --build=profile yt/yt/library/ytprof/example
+
+# Запускаем пример в соседнем терминале на порту 10003
+./yt/yt/library/ytprof/example/example 10003
+
+# Смотрим потребление памяти
+ya tool pprof -symbolize=none http://localhost:10003/heap
+Fetching profile over HTTP from http://localhost:10003/heap
+Saved profile in /home/prime/pprof/pprof.example.allocations.space.002.pb.gz
+File: example
+generated by ytprof 0.2
+Type: space
+Entering interactive mode (type "help" for commands, "o" for options)
+(pprof) list main
+Total: 64.80MB
+ROUTINE ======================== main in /home/prime/arc/yt/yt/library/ytprof/example/main.cpp
+ 0 62.38MB (flat, cum) 96.25% of Total
+ . . 30: Register(server, "");
+ . . 31: server->Start();
+ . . 32:
+ . . 33: THashMap<TString, std::vector<int>> data;
+ . . 34: for (int i = 0; i < 1024 * 16; i++) {
+ . 62.38MB 35: data[ToString(i)].resize(1024);
+ . . 36: }
+ . . 37:
+ . . 38: ui64 value = 0;
+ . . 39: while (true) {
+ . . 40: THash<TString> hasher;
+
+# Смотрим потребление CPU в течении 15 секунд
+ya tool pprof -symbolize=none 'http://localhost:10003/profile?d=15'
+Fetching profile over HTTP from http://localhost:10003/profile?d=15
+Saved profile in /home/prime/pprof/pprof.example.sample.profile.004.pb.gz
+File: example
+generated by ytprof 0.2
+arc_revision=8788163
+build_type=profile
+Type: cpu
+Entering interactive mode (type "help" for commands, "o" for options)
+(pprof) top
+Showing nodes accounting for 11840ms, 78.83% of 15020ms total
+Dropped 45 nodes (cum <= 75.10ms)
+Showing top 10 nodes out of 53
+ flat flat% sum% cum cum%
+ 5250ms 34.95% 34.95% 5250ms 34.95% (anonymous namespace)::TBasicIntFormatter::Format
+ 2610ms 17.38% 52.33% 3760ms 25.03% CityHash64
+ 1000ms 6.66% 58.99% 1000ms 6.66% ReadUnaligned (inline)
+ 980ms 6.52% 65.51% 980ms 6.52% memcpyU256
+ 530ms 3.53% 69.04% 530ms 3.53% (anonymous namespace)::TIntFormatter::Format
+ 360ms 2.40% 71.44% 360ms 2.40% TcmallocSlab_Internal_Pop_trampoline_2
+ 330ms 2.20% 73.64% 330ms 2.20% TcmallocSlab_Internal_Push_trampoline_7
+ 280ms 1.86% 75.50% 280ms 1.86% std::__y1::basic_string::__is_long
+ 260ms 1.73% 77.23% 4020ms 26.76% NHashPrivate::ComputeStringHash
+ 240ms 1.60% 78.83% 240ms 1.60% tcmalloc::tcmalloc_internal::PageMap2::sizeclass
+(pprof) list main
+Total: 15.02s
+ROUTINE ======================== main in /home/prime/arc/yt/yt/library/ytprof/example/main.cpp
+ 150ms 13.99s (flat, cum) 93.14% of Total
+ . . 1:#include <yt/yt/core/concurrency/poller.h>
+ . . 2:#include <yt/yt/core/concurrency/thread_pool_poller.h>
+ . . 3:#include <yt/yt/core/concurrency/action_queue.h>
+ . . 4:#include <yt/yt/core/http/server.h>
+ . . 5:
+ . . 6:#include <yt/yt/library/ytprof/http/handler.h>
+ . . 7:#include <yt/yt/library/ytprof/heap_profiler.h>
+ . . 8:
+ . . 9:#include <absl/debugging/stacktrace.h>
+ . . 10:
+ . . 11:using namespace NYT;
+ . . 12:using namespace NYT::NHttp;
+ . . 13:using namespace NYT::NConcurrency;
+ . . 14:using namespace NYT::NYTProf;
+ . . 15:
+ . . 16:int main(int argc, char* argv[])
+ . . 17:{
+ . . 18: absl::SetStackUnwinder(AbslStackUnwinder);
+ . . 19: tcmalloc::MallocExtension::SetProfileSamplingRate(2_MB);
+ . . 20:
+ . . 21: try {
+ . . 22: if (argc != 2 && argc != 3) {
+ . . 23: throw yexception() << "usage: " << argv[0] << " PORT";
+ . . 24: }
+ . . 25:
+ . . 26: auto port = FromString<int>(argv[1]);
+ . . 27: auto poller = CreateThreadPoolPoller(1, "Example");
+ . . 28: auto server = CreateServer(port, poller);
+ . . 29:
+ . . 30: Register(server, "");
+ . . 31: server->Start();
+ . . 32:
+ . . 33: THashMap<TString, std::vector<int>> data;
+ . . 34: for (int i = 0; i < 1024 * 16; i++) {
+ . . 35: data[ToString(i)].resize(1024);
+ . . 36: }
+ . . 37:
+ . . 38: ui64 value = 0;
+ . . 39: while (true) {
+ . . 40: THash<TString> hasher;
+ . . 41: for (int i = 0; i < 10000000; i++) {
+ . 13.75s 42: value += hasher(ToString(i));
+ . . 43: }
+ . . 44:
+ . . 45: std::vector<TString> data;
+ . . 46: for (int i = 0; i < 10000; i++) {
+ . 80ms 47: data.push_back(TString(1024, 'x'));
+ . . 48: }
+ . . 49:
+ . . 50: if (value == 1) {
+ . . 51: Sleep(TDuration::Seconds(1));
+ . . 52: }
+ . 10ms 53: }
+ . . 54: } catch (const std::exception& ex) {
+ . . 55: Cerr << ex.what() << Endl;
+ . . 56: _exit(1);
+ . . 57: }
+ . . 58:
+```
+
+## CPU профайлер
+
+CPU профайлер реализован на основе сигнала SIGPROF. В выключенном состоянии
+профайлер никак не влияет на работу программы. В включенном состоянии ядро
+начинает посылать сигналы всем тредам процесса. Обработчик сигнала собирает текущий стек и пушит его в lock free очередь.
+
+Для работы CPU профайлера нужно чтобы исполняемый файл был собран с опцией `--build=profile`. Добавление этой опции может замедлить исполнение на программы на несколько процентов, поэтому рекомендуется совмещать `--build=profile` с флагом `--thinlto`. Такая конфигурация должна работать на пару процентов быстрее, чем обычный `--build=release`.
+
+### CPU теги
+
+YTProf поддерживает теги. Теги позволяют фильтровать профиль в pprof с помощью команды
+`tagfocus`. Например, можно положить в тег id эксперимента или имя текущего пользователя.
+
+`TCpuProfilerTagGuard` кладёт тег в TLS треда. Также есть возможность положить тег в `TTraceContext` запроса, чтобы он автоматически пробрасывался сквозь асинхронный код YT файберов.
+
+### Тяжелые actions
+
+Профайлер умеет тегировать семплы временем исполнения в YT тред пуле. Это позволяет находить код, который надолго заблокировал какой-то тред.
+
+- Можно включить теги, чтобы фильтровать в pprof `/profile?record_action_run_time=1`.
+- Можно сразу отфильтровать только семплы из долгих экшенов `/profile?action_min_exec_time=1s`
+
+```
+# После этого, можно отфильтровать семплы командой tagfocus.
+(pprof) tagfocus=action_run_time_us=1000000:
+# И посмотреть список семплов.
+(pprof) traces
+```
+
+## Memory Profiler
+
+Для работы memory профайлера нужна поддержка в аллокаторе. На текущий момент, такая
+поддержка есть только в tcmalloc.
+
+Чтобы включить memory profiler, нужно:
+
+ 1. Слинковаться с одной конфигураций tcmalloc, дописав в ya.make своей программы:
+ ```
+ ALLOCATOR(TCMALLOC)
+ ```
+ 2. На раннем этапе инициализации приложения, до того как началось выделение больших пользовательских объектов
+ написать код конфигурации профайлера.
+ ```c++
+ #include <yt/yt/library/ytprof/heap_profiler.h>
+ #include <absl/debugging/stacktrace.h>
+
+ int main() {
+ absl::SetStackUnwinder(AbslStackUnwinder);
+ tcmalloc::MallocExtension::SetProfileSamplingRate(2_MB);
+
+ ...
+ }
+ ```
+
+tcmalloc поддерживает 4 вида профилей:
+- `heap` - снепшот текущего потребления памяти. Это основной профиль, который вам интересен.
+- `peak` - снепшот потребления памяти в момент, когда приложение потребляло максимальный объем памяти. Профиль удобен, чтобы искать внезапный пик потребления, который уже ушёл.
+- `allocations` - профиль выделения памяти за интервал. Полезен для того, чтобы оптимизировать производительность. Код, который выделяет и освобождает вектор в горячем цикле, не будет виден в `heap` профиле потому что в каждый момент времени жив всего один блок памяти. Но будет виден в этом профиле.
+- `fragmentation` - профиль фрагментации памяти. Small объекты в tcmalloc
+выделяются из спанов. Спан - это массив small объектов одинакового размера. Спан не вернётся назад в общий пул памяти, пока не освободят все объекты в нём. В худшем случае, всю память могут занимать такие спаны с одним живым объектом внутри. Этот тип профиля позволяет находить, какие аллокации приводят к подобной проблеме.
+
+Memory profiler работает в любом типе сборки, кроме сборки с санитайзерами.
+
+## Spinlock Profiler
+
+ytprof поддерживает профилирование 2-х видов спинлоков.
+- `lock` - профиль спинлоков из `absl`. Эти спинлоки используются внутри tcmalloc.
+- `block` - профиль спинлоков из `library/cpp/yt/threading`
+
+### Быстрые примеры для YT
+
+```
+# Текущее потребление ноды
+curl http://vla0-8040-co-node-arnold.vla.yp-c.yandex.net:10013/ytprof/heap > ../heap.pb.gz
+# Пиковое потребение ноды
+curl 'http://vla2-8153-node-seneca-vla.vla.yp-c.yandex.net:10012/ytprof/peak' > ../peak.pb.gz
+# Профиль аллокаций ноды
+curl 'http://vla2-8153-node-seneca-vla.vla.yp-c.yandex.net:10012/ytprof/allocations?d=60' > ../allocations_node.pb.gz
+# CPU профиль мастера
+curl 'http://m002-hahn.sas.yp-c.yandex.net:10010/ytprof/profile' > ../profile.pb.gz
+# CPU профиль шедулера с повышенной частотой
+curl 'http://sas5-9718-scheduler-hahn.sas.yp-c.yandex.net:10011/ytprof/profile?freq=1000' > ../profile_1000.pb.gz
+
+# Смотрим профиль из корня аркадии. trim_path можно поправить, если исходники не находятся.
+ya tool pprof -symbolize=none -trim_path='/-S/:/-B/:/home/teamcity/source/Yt_ArcRelease' ../heap.pb.gz
+File: ytserver-node
+generated by ytprof 0.3
+binary_version=22.1.9091475-stable-ya~b6081b45d6f7f85a
+arc_revision=b6081b45d6f7f85ab6f9772f31c277fe01de886c
+build_type=profile
+Type: space
+Entering interactive mode (type "help" for commands, "o" for options)
+# Сохраняет svg файл. В нём проще всего найти интересные функции, чтобы потом смотреть на них более подробно
+(pprof) svg
+# Показывает топ функций. Удобно в cpu, в памяти скорее всего будет показывать функции аллокации
+(pprof) top
+# Сортирует не по потреблению в функции, а по потреблению в самой функции и всех её детях
+(pprof) sort=cum
+# Переключает вес семпла на штуки. По дефолту семплы взвешиваются в секундах или мегабайтах
+(pprof) sample_index=0
+# Показывает код функции. Параметр - это регулярное выражение
+(pprof) list IOutputStream::Write
+# Фильтрует семплы по треду
+(pprof) tagfocus=thread=StorageHeavy
+# Аллокации по размеру
+(pprof) tagfocus=allocated_size=2mb:
+# Фильтрует экшены, которые бежали больше 10ms
+(pprof) tagfocus=action_run_time_us=10000:
+# Убирает фильтр
+(pprof) tagfocus=
+# Показывает сырые семплы
+(pprof) traces
+```
diff --git a/yt/yt/library/ytprof/build_info.cpp b/yt/yt/library/ytprof/build_info.cpp
new file mode 100644
index 00000000000..97f3652485d
--- /dev/null
+++ b/yt/yt/library/ytprof/build_info.cpp
@@ -0,0 +1,34 @@
+#include "build_info.h"
+
+#include <library/cpp/svnversion/svnversion.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TBuildInfo TBuildInfo::GetDefault()
+{
+ TBuildInfo buildInfo;
+
+ buildInfo.BuildType = YTPROF_BUILD_TYPE;
+ buildInfo.BuildType.to_lower(); // no shouting
+
+ if (GetVCS() == TString{"arc"}) {
+ buildInfo.ArcRevision = GetProgramCommitId();
+ }
+
+ return buildInfo;
+}
+
+bool IsProfileBuild()
+{
+#ifdef YTPROF_PROFILE_BUILD
+ return true;
+#else
+ return false;
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/build_info.h b/yt/yt/library/ytprof/build_info.h
new file mode 100644
index 00000000000..c2e8a8ed1e1
--- /dev/null
+++ b/yt/yt/library/ytprof/build_info.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TBuildInfo
+{
+ // Semantic version of current binary.
+ //
+ // Meaning of this field is application specific. Empty by default.
+ TString BinaryVersion;
+
+ // ArcRevision this binary was built from.
+ TString ArcRevision;
+ bool ArcDirty;
+
+ // BuildType this binary was built with.
+ TString BuildType;
+
+ static TBuildInfo GetDefault();
+};
+
+bool IsProfileBuild();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/cpu_profiler.cpp b/yt/yt/library/ytprof/cpu_profiler.cpp
new file mode 100644
index 00000000000..e04aefa4249
--- /dev/null
+++ b/yt/yt/library/ytprof/cpu_profiler.cpp
@@ -0,0 +1,201 @@
+#include "cpu_profiler.h"
+#include "symbolize.h"
+
+#if defined(_linux_)
+#include <link.h>
+#include <sys/syscall.h>
+#include <sys/prctl.h>
+
+#include <library/cpp/yt/cpu_clock/clock.h>
+
+#include <library/cpp/yt/backtrace/cursors/frame_pointer/frame_pointer_cursor.h>
+
+#include <library/cpp/yt/backtrace/cursors/interop/interop.h>
+
+#include <util/system/yield.h>
+
+#include <csignal>
+#include <functional>
+#endif
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if not defined(_linux_)
+
+TCpuProfiler::TCpuProfiler(TCpuProfilerOptions options)
+ : TSignalSafeProfiler(options)
+{ }
+
+TCpuProfiler::~TCpuProfiler()
+{ }
+
+void TCpuProfiler::EnableProfiler()
+{ }
+
+void TCpuProfiler::DisableProfiler()
+{ }
+
+void TCpuProfiler::AnnotateProfile(NProto::Profile* /* profile */, const std::function<i64(const TString&)>& /* stringify */)
+{ }
+
+i64 TCpuProfiler::EncodeValue(i64 value)
+{
+ return value;
+}
+
+TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration)
+{
+ return {};
+}
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if defined(_linux_)
+
+std::atomic<TCpuProfiler*> TCpuProfiler::ActiveProfiler_;
+std::atomic<bool> TCpuProfiler::HandlingSigprof_;
+
+TCpuProfiler::TCpuProfiler(TCpuProfilerOptions options)
+ : TSignalSafeProfiler(options)
+ , Options_(options)
+ , ProfilePeriod_(1000000 / Options_.SamplingFrequency)
+{ }
+
+TCpuProfiler::~TCpuProfiler()
+{
+ Stop();
+}
+
+void TCpuProfiler::SigProfHandler(int /* sig */, siginfo_t* info, void* ucontext)
+{
+ int savedErrno = errno;
+
+ while (HandlingSigprof_.exchange(true)) {
+ SchedYield();
+ }
+
+ auto profiler = ActiveProfiler_.load();
+ if (profiler) {
+ profiler->OnSigProf(info, reinterpret_cast<ucontext_t*>(ucontext));
+ }
+
+ HandlingSigprof_.store(false);
+
+ errno = savedErrno;
+}
+
+void TCpuProfiler::EnableProfiler()
+{
+ TCpuProfiler* expected = nullptr;
+ if (!ActiveProfiler_.compare_exchange_strong(expected, this)) {
+ throw yexception() << "Another instance of CPU profiler is running";
+ }
+
+ struct sigaction sig;
+ sig.sa_flags = SA_SIGINFO | SA_RESTART;
+ sigemptyset(&sig.sa_mask);
+ sig.sa_sigaction = &TCpuProfiler::SigProfHandler;
+
+ if (sigaction(SIGPROF, &sig, NULL) != 0) {
+ throw TSystemError(LastSystemError());
+ }
+
+ itimerval period;
+ period.it_value.tv_sec = 0;
+ period.it_value.tv_usec = 1000000 / Options_.SamplingFrequency;
+ period.it_interval = period.it_value;
+
+ if (setitimer(ITIMER_PROF, &period, nullptr) != 0) {
+ throw TSystemError(LastSystemError());
+ }
+}
+
+void TCpuProfiler::DisableProfiler()
+{
+ itimerval period{};
+ if (setitimer(ITIMER_PROF, &period, nullptr) != 0) {
+ throw TSystemError(LastSystemError());
+ }
+
+ struct sigaction sig;
+ sig.sa_flags = SA_RESTART;
+ sigemptyset(&sig.sa_mask);
+ sig.sa_handler = SIG_IGN;
+
+ if (sigaction(SIGPROF, &sig, NULL) != 0) {
+ throw TSystemError(LastSystemError());
+ }
+
+ ActiveProfiler_ = nullptr;
+ while (HandlingSigprof_ > 0) {
+ SchedYield();
+ }
+}
+
+void TCpuProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify)
+{
+ auto sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("sample"));
+ sampleType->set_unit(stringify("count"));
+
+ sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("cpu"));
+ sampleType->set_unit(stringify("microseconds"));
+
+ auto periodType = profile->mutable_period_type();
+ periodType->set_type(stringify("cpu"));
+ periodType->set_unit(stringify("microseconds"));
+
+ profile->set_period(1000000 / Options_.SamplingFrequency);
+
+ if (SignalOverruns_ > 0) {
+ profile->add_comment(stringify("cpu.signal_overruns=" + std::to_string(SignalOverruns_)));
+ }
+}
+
+i64 TCpuProfiler::EncodeValue(i64 value)
+{
+ return value;
+}
+
+void TCpuProfiler::OnSigProf(siginfo_t* info, ucontext_t* ucontext)
+{
+ SignalOverruns_ += info->si_overrun;
+
+ for (const auto& filter : Options_.SampleFilters) {
+ if (!filter()) {
+ // This sample is filtered out.
+ return;
+ }
+ }
+
+ auto cursorContext = NBacktrace::FramePointerCursorContextFromUcontext(*ucontext);
+ NBacktrace::TFramePointerCursor cursor(&Reader_, cursorContext);
+
+ RecordSample(&cursor, ProfilePeriod_);
+}
+
+TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration minExecTime)
+{
+ auto minCpuDuration = DurationToCpuDuration(minExecTime);
+
+ return [minCpuDuration] () {
+ auto fiberStartTime = GetTraceContextTimingCheckpoint();
+ if (fiberStartTime == 0) {
+ return false;
+ }
+
+ auto delta = GetCpuInstant() - fiberStartTime;
+ return delta > minCpuDuration;
+ };
+}
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/cpu_profiler.h b/yt/yt/library/ytprof/cpu_profiler.h
new file mode 100644
index 00000000000..81519ba368b
--- /dev/null
+++ b/yt/yt/library/ytprof/cpu_profiler.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include <thread>
+#include <array>
+#include <variant>
+
+#if defined(_linux_)
+#include <sys/types.h>
+#endif
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+#include <yt/yt/library/ytprof/api/api.h>
+
+#include <library/cpp/yt/memory/intrusive_ptr.h>
+
+#include <util/generic/hash.h>
+#include <util/datetime/base.h>
+
+#include "queue.h"
+#include "signal_safe_profiler.h"
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TCpuProfilerOptions
+ : public TSignalSafeProfilerOptions
+{
+public:
+ int SamplingFrequency = 100;
+
+ using TSampleFilter = std::function<bool()>;
+ std::vector<TSampleFilter> SampleFilters;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TCpuProfiler
+ : public TSignalSafeProfiler
+{
+public:
+ explicit TCpuProfiler(TCpuProfilerOptions options = {});
+ ~TCpuProfiler();
+
+private:
+#if defined(_linux_)
+ const TCpuProfilerOptions Options_;
+ const i64 ProfilePeriod_;
+
+ static std::atomic<TCpuProfiler*> ActiveProfiler_;
+ static std::atomic<bool> HandlingSigprof_;
+ static void SigProfHandler(int sig, siginfo_t* info, void* ucontext);
+
+ std::atomic<i64> SignalOverruns_{0};
+
+ void OnSigProf(siginfo_t* info, ucontext_t* ucontext);
+#endif
+
+ void EnableProfiler() override;
+ void DisableProfiler() override;
+ void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override;
+ i64 EncodeValue(i64 value) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration minExecTime);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/external_pprof.cpp b/yt/yt/library/ytprof/external_pprof.cpp
new file mode 100644
index 00000000000..71419f82f94
--- /dev/null
+++ b/yt/yt/library/ytprof/external_pprof.cpp
@@ -0,0 +1,56 @@
+#include "external_pprof.h"
+
+#include "profile.h"
+
+#include <library/cpp/resource/resource.h>
+
+#include <util/folder/tempdir.h>
+#include <util/system/file.h>
+#include <util/stream/file.h>
+
+#include <util/generic/string.h>
+
+#include <vector>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void SymbolizeByExternalPProf(NProto::Profile* profile, const TSymbolizationOptions& options)
+{
+ TTempDir tmpDir = TTempDir::NewTempDir(options.TmpDir);
+ if (options.KeepTmpDir) {
+ tmpDir.DoNotRemove();
+ }
+
+ auto pprofBinary = NResource::Find("/ytprof/pprof");
+ auto llvmSymbolizerBinary = NResource::Find("/ytprof/llvm-symbolizer");
+
+ auto writeFile = [&] (const TString& name) {
+ TFile file{tmpDir.Path() / name, EOpenModeFlag::CreateAlways|EOpenModeFlag::WrOnly|EOpenModeFlag::AX|EOpenModeFlag::ARW};
+ auto binary = NResource::Find("/ytprof/" + name);
+ file.Write(binary.data(), binary.size());
+ };
+
+ writeFile("pprof");
+ writeFile("llvm-symbolizer");
+
+ TFileOutput output(tmpDir.Path() / "in.pb.gz");
+ WriteProfile(&output, *profile);
+ output.Finish();
+
+ options.RunTool(std::vector<TString>{
+ tmpDir.Path() / "pprof",
+ "-proto",
+ "-output=" + (tmpDir.Path() / "out.pb.gz").GetPath(),
+ "-tools=" + tmpDir.Path().GetPath(),
+ (tmpDir.Path() / "in.pb.gz").GetPath()
+ });
+
+ TFileInput input(tmpDir.Path() / "out.pb.gz");
+ ReadProfile(&input, profile);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/external_pprof.h b/yt/yt/library/ytprof/external_pprof.h
new file mode 100644
index 00000000000..d7855a50b1f
--- /dev/null
+++ b/yt/yt/library/ytprof/external_pprof.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSymbolizationOptions
+{
+ TString TmpDir = "/tmp";
+
+ bool KeepTmpDir = false;
+
+ std::function<void(const std::vector<TString>&)> RunTool;
+};
+
+void SymbolizeByExternalPProf(
+ NProto::Profile* profile,
+ const TSymbolizationOptions& options);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/heap_profiler.cpp b/yt/yt/library/ytprof/heap_profiler.cpp
new file mode 100644
index 00000000000..2fecf95473d
--- /dev/null
+++ b/yt/yt/library/ytprof/heap_profiler.cpp
@@ -0,0 +1,261 @@
+#include "heap_profiler.h"
+
+#include "symbolize.h"
+
+#include <library/cpp/yt/memory/leaky_singleton.h>
+
+#include <library/cpp/yt/threading/spin_lock.h>
+
+#include <library/cpp/yt/backtrace/cursors/libunwind/libunwind_cursor.h>
+
+#include <util/generic/hash_set.h>
+#include <util/string/join.h>
+
+#include <tcmalloc/malloc_extension.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+using namespace NTracing;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_WEAK void* CreateAllocationTagsData()
+{
+ return nullptr;
+}
+
+Y_WEAK void* CopyAllocationTagsData(void* userData)
+{
+ return userData;
+}
+
+Y_WEAK void DestroyAllocationTagsData(void* /*userData*/)
+{ }
+
+Y_WEAK const std::vector<std::pair<TString, TString>>& ReadAllocationTagsData(void* /*userData*/)
+{
+ static const std::vector<std::pair<TString, TString>> emptyTags;
+ return emptyTags;
+}
+
+Y_WEAK std::optional<TString> FindTagValue(
+ const std::vector<std::pair<TString, TString>>& tags,
+ const TString& key)
+{
+ Y_UNUSED(tags);
+ Y_UNUSED(key);
+ return ToString(NullMemoryTag);
+}
+
+Y_WEAK void StartAllocationTagsCleanupThread(TDuration /*cleanupInterval*/)
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+namespace NYT::NYTProf {
+
+using namespace NThreading;
+using namespace NTracing;
+
+////////////////////////////////////////////////////////////////////////////////
+
+NProto::Profile ConvertAllocationProfile(const tcmalloc::Profile& snapshot)
+{
+ NProto::Profile profile;
+ profile.add_string_table();
+
+ auto addString = [&] (TString str) {
+ auto index = profile.string_table_size();
+ profile.add_string_table(str);
+ return index;
+ };
+
+ auto sampleType = profile.add_sample_type();
+ sampleType->set_type(addString("allocations"));
+ sampleType->set_unit(addString("count"));
+
+ sampleType = profile.add_sample_type();
+ sampleType->set_type(addString("space"));
+
+ auto bytesUnitId = addString("bytes");
+ sampleType->set_unit(bytesUnitId);
+
+ auto periodType = profile.mutable_period_type();
+ periodType->set_type(sampleType->type());
+ periodType->set_unit(sampleType->unit());
+
+ profile.set_period(snapshot.Period());
+
+ auto allocatedSizeId = addString("allocated_size");
+ auto requestedSizeId = addString("requested_size");
+ auto requestedAlignmentId = addString("requested_alignment");
+
+ THashMap<void*, ui64> locations;
+ snapshot.Iterate([&] (const tcmalloc::Profile::Sample& sample) {
+ auto sampleProto = profile.add_sample();
+ sampleProto->add_value(sample.count);
+ sampleProto->add_value(sample.sum);
+
+ auto allocatedSizeLabel = sampleProto->add_label();
+ allocatedSizeLabel->set_key(allocatedSizeId);
+ allocatedSizeLabel->set_num(sample.allocated_size);
+ allocatedSizeLabel->set_num_unit(bytesUnitId);
+
+ auto requestedSizeLabel = sampleProto->add_label();
+ requestedSizeLabel->set_key(requestedSizeId);
+ requestedSizeLabel->set_num(sample.requested_size);
+ requestedSizeLabel->set_num_unit(bytesUnitId);
+
+ auto requestedAlignmentLabel = sampleProto->add_label();
+ requestedAlignmentLabel->set_key(requestedAlignmentId);
+ requestedAlignmentLabel->set_num(sample.requested_alignment);
+ requestedAlignmentLabel->set_num_unit(bytesUnitId);
+
+ for (int i = 0; i < sample.depth; i++) {
+ auto ip = sample.stack[i];
+
+ auto it = locations.find(ip);
+ if (it != locations.end()) {
+ sampleProto->add_location_id(it->second);
+ continue;
+ }
+
+ auto locationId = locations.size() + 1;
+
+ auto location = profile.add_location();
+ location->set_address(reinterpret_cast<ui64>(ip));
+ location->set_id(locationId);
+
+ sampleProto->add_location_id(locationId);
+ locations[ip] = locationId;
+ }
+
+ // TODO(gepardo): Deduplicate values in string table
+ for (const auto& [key, value] : ReadAllocationTagsData(sample.user_data)) {
+ auto label = sampleProto->add_label();
+ label->set_key(addString(key));
+ label->set_str(addString(value));
+ }
+ });
+
+ profile.set_drop_frames(addString(JoinSeq("|", {
+ ".*SampleifyAllocation",
+ ".*AllocSmall",
+ "slow_alloc",
+ "TBasicString::TBasicString",
+ })));
+
+ Symbolize(&profile, true);
+ return profile;
+}
+
+NProto::Profile ReadHeapProfile(tcmalloc::ProfileType profileType)
+{
+ auto snapshot = tcmalloc::MallocExtension::SnapshotCurrent(profileType);
+ return ConvertAllocationProfile(snapshot);
+}
+
+THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage()
+{
+ THashMap<TMemoryTag, ui64> usage;
+
+ auto snapshot = tcmalloc::MallocExtension::SnapshotCurrent(tcmalloc::ProfileType::kHeap);
+ snapshot.Iterate([&] (const tcmalloc::Profile::Sample& sample) {
+ auto maybeMemoryTagStr = FindTagValue(
+ ReadAllocationTagsData(sample.user_data),
+ MemoryTagLiteral);
+
+ if (maybeMemoryTagStr) {
+ auto memoryTag = FromString<TMemoryTag>(maybeMemoryTagStr.value());
+ if (memoryTag != NullMemoryTag) {
+ usage[memoryTag] += sample.sum;
+ }
+ }
+ });
+
+ return usage;
+}
+
+static thread_local TMemoryTag MemoryTag = 0;
+
+TMemoryTag GetMemoryTag()
+{
+ return MemoryTag;
+}
+
+TMemoryTag SetMemoryTag(TMemoryTag newTag)
+{
+ auto oldTag = MemoryTag;
+ MemoryTag = newTag;
+ return oldTag;
+}
+
+struct TMemoryUsageSnapshot
+{
+ TSpinLock Lock;
+ THashMap<TMemoryTag, ui64> Snapshot;
+};
+
+void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot)
+{
+ auto snapshot = LeakySingleton<TMemoryUsageSnapshot>();
+ auto guard = Guard(snapshot->Lock);
+ snapshot->Snapshot = std::move(usageSnapshot);
+}
+
+i64 GetEstimatedMemoryUsage(TMemoryTag tag)
+{
+ auto snapshot = LeakySingleton<TMemoryUsageSnapshot>();
+ auto guard = Guard(snapshot->Lock);
+ auto it = snapshot->Snapshot.find(tag);
+ if (it != snapshot->Snapshot.end()) {
+ return it->second;
+ }
+ return 0;
+}
+
+int AbslStackUnwinder(
+ void** frames,
+ int*,
+ int maxFrames,
+ int skipFrames,
+ const void*,
+ int*)
+{
+ NBacktrace::TLibunwindCursor cursor;
+
+ for (int i = 0; i < skipFrames + 1; ++i) {
+ cursor.MoveNext();
+ }
+
+ int count = 0;
+ for (int i = 0; i < maxFrames; ++i) {
+ if (cursor.IsFinished()) {
+ return count;
+ }
+
+ // IP point's to return address. Substract 1 to get accurate line information for profiler.
+ frames[i] = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(cursor.GetCurrentIP()) - 1);
+ count++;
+
+ cursor.MoveNext();
+ }
+ return count;
+}
+
+void EnableMemoryProfilingTags()
+{
+ StartAllocationTagsCleanupThread(TDuration::Seconds(1));
+ tcmalloc::MallocExtension::SetSampleUserDataCallbacks(
+ &CreateAllocationTagsData,
+ &CopyAllocationTagsData,
+ &DestroyAllocationTagsData);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/heap_profiler.h b/yt/yt/library/ytprof/heap_profiler.h
new file mode 100644
index 00000000000..55752b188de
--- /dev/null
+++ b/yt/yt/library/ytprof/heap_profiler.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+
+#include <yt/yt/core/tracing/public.h>
+
+#include <util/generic/hash.h>
+
+#include <tcmalloc/malloc_extension.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NProto::Profile ConvertAllocationProfile(const tcmalloc::Profile& snapshot);
+
+NProto::Profile ReadHeapProfile(tcmalloc::ProfileType profileType);
+
+int AbslStackUnwinder(void** frames, int*,
+ int maxFrames, int skipFrames,
+ const void*,
+ int*);
+
+typedef uintptr_t TMemoryTag;
+
+TMemoryTag GetMemoryTag();
+
+TMemoryTag SetMemoryTag(TMemoryTag newTag);
+
+THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage();
+
+void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot);
+
+i64 GetEstimatedMemoryUsage(TMemoryTag tag);
+
+void EnableMemoryProfilingTags();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/profile.cpp b/yt/yt/library/ytprof/profile.cpp
new file mode 100644
index 00000000000..ed4cc3f9852
--- /dev/null
+++ b/yt/yt/library/ytprof/profile.cpp
@@ -0,0 +1,34 @@
+#include "profile.h"
+
+#include <util/stream/str.h>
+#include <util/stream/zlib.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void WriteProfile(IOutputStream* out, const NProto::Profile& profile)
+{
+ TZLibCompress compress(out, ZLib::StreamType::GZip);
+ profile.SerializeToArcadiaStream(&compress);
+ compress.Finish();
+}
+
+TString SerializeProfile(const NProto::Profile& profile)
+{
+ TStringStream stream;
+ WriteProfile(&stream, profile);
+ return stream.Str();
+}
+
+void ReadProfile(IInputStream* in, NProto::Profile* profile)
+{
+ profile->Clear();
+
+ TZLibDecompress decompress(in);
+ profile->ParseFromArcadiaStream(&decompress);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/profile.h b/yt/yt/library/ytprof/profile.h
new file mode 100644
index 00000000000..6283c200d27
--- /dev/null
+++ b/yt/yt/library/ytprof/profile.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <util/stream/fwd.h>
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void WriteProfile(IOutputStream* out, const NProto::Profile& profile);
+
+TString SerializeProfile(const NProto::Profile& profile);
+
+void ReadProfile(IInputStream* in, NProto::Profile* profile);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/profile.proto b/yt/yt/library/ytprof/profile.proto
new file mode 100644
index 00000000000..18eaba3e56a
--- /dev/null
+++ b/yt/yt/library/ytprof/profile.proto
@@ -0,0 +1,209 @@
+// Copyright 2016 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Profile is a common stacktrace profile format.
+//
+// Measurements represented with this format should follow the
+// following conventions:
+//
+// - Consumers should treat unset optional fields as if they had been
+// set with their default value.
+//
+// - When possible, measurements should be stored in "unsampled" form
+// that is most useful to humans. There should be enough
+// information present to determine the original sampled values.
+//
+// - On-disk, the serialized proto must be gzip-compressed.
+//
+// - The profile is represented as a set of samples, where each sample
+// references a sequence of locations, and where each location belongs
+// to a mapping.
+// - There is a N->1 relationship from sample.location_id entries to
+// locations. For every sample.location_id entry there must be a
+// unique Location with that id.
+// - There is an optional N->1 relationship from locations to
+// mappings. For every nonzero Location.mapping_id there must be a
+// unique Mapping with that id.
+
+syntax = "proto3";
+
+package NYT.NYTProf.NProto;
+
+message Profile {
+ // A description of the samples associated with each Sample.value.
+ // For a cpu profile this might be:
+ // [["cpu","nanoseconds"]] or [["wall","seconds"]] or [["syscall","count"]]
+ // For a heap profile, this might be:
+ // [["allocations","count"], ["space","bytes"]],
+ // If one of the values represents the number of events represented
+ // by the sample, by convention it should be at index 0 and use
+ // sample_type.unit == "count".
+ repeated ValueType sample_type = 1;
+ // The set of samples recorded in this profile.
+ repeated Sample sample = 2;
+ // Mapping from address ranges to the image/binary/library mapped
+ // into that address range. mapping[0] will be the main binary.
+ repeated Mapping mapping = 3;
+ // Useful program location
+ repeated Location location = 4;
+ // Functions referenced by locations
+ repeated Function function = 5;
+ // A common table for strings referenced by various messages.
+ // string_table[0] must always be "".
+ repeated string string_table = 6;
+ // frames with Function.function_name fully matching the following
+ // regexp will be dropped from the samples, along with their successors.
+ int64 drop_frames = 7; // Index into string table.
+ // frames with Function.function_name fully matching the following
+ // regexp will be kept, even if it matches drop_functions.
+ int64 keep_frames = 8; // Index into string table.
+
+ // The following fields are informational, do not affect
+ // interpretation of results.
+
+ // Time of collection (UTC) represented as nanoseconds past the epoch.
+ int64 time_nanos = 9;
+ // Duration of the profile, if a duration makes sense.
+ int64 duration_nanos = 10;
+ // The kind of events between sampled ocurrences.
+ // e.g [ "cpu","cycles" ] or [ "heap","bytes" ]
+ ValueType period_type = 11;
+ // The number of events between sampled occurrences.
+ int64 period = 12;
+ // Freeform text associated to the profile.
+ repeated int64 comment = 13; // Indices into string table.
+ // Index into the string table of the type of the preferred sample
+ // value. If unset, clients should default to the last sample value.
+ int64 default_sample_type = 14;
+}
+
+// ValueType describes the semantics and measurement units of a value.
+message ValueType {
+ int64 type = 1; // Index into string table.
+ int64 unit = 2; // Index into string table.
+}
+
+// Each Sample records values encountered in some program
+// context. The program context is typically a stack trace, perhaps
+// augmented with auxiliary information like the thread-id, some
+// indicator of a higher level request being handled etc.
+message Sample {
+ // The ids recorded here correspond to a Profile.location.id.
+ // The leaf is at location_id[0].
+ repeated uint64 location_id = 1;
+ // The type and unit of each value is defined by the corresponding
+ // entry in Profile.sample_type. All samples must have the same
+ // number of values, the same as the length of Profile.sample_type.
+ // When aggregating multiple samples into a single sample, the
+ // result has a list of values that is the element-wise sum of the
+ // lists of the originals.
+ repeated int64 value = 2;
+ // label includes additional context for this sample. It can include
+ // things like a thread id, allocation size, etc
+ repeated Label label = 3;
+}
+
+message Label {
+ int64 key = 1; // Index into string table
+
+ // At most one of the following must be present
+ int64 str = 2; // Index into string table
+ int64 num = 3;
+
+ // Should only be present when num is present.
+ // Specifies the units of num.
+ // Use arbitrary string (for example, "requests") as a custom count unit.
+ // If no unit is specified, consumer may apply heuristic to deduce the unit.
+ // Consumers may also interpret units like "bytes" and "kilobytes" as memory
+ // units and units like "seconds" and "nanoseconds" as time units,
+ // and apply appropriate unit conversions to these.
+ int64 num_unit = 4; // Index into string table
+}
+
+message Mapping {
+ // Unique nonzero id for the mapping.
+ uint64 id = 1;
+ // Address at which the binary (or DLL) is loaded into memory.
+ uint64 memory_start = 2;
+ // The limit of the address range occupied by this mapping.
+ uint64 memory_limit = 3;
+ // Offset in the binary that corresponds to the first mapped address.
+ uint64 file_offset = 4;
+ // The object this entry is loaded from. This can be a filename on
+ // disk for the main binary and shared libraries, or virtual
+ // abstractions like "[vdso]".
+ int64 filename = 5; // Index into string table
+ // A string that uniquely identifies a particular program version
+ // with high probability. E.g., for binaries generated by GNU tools,
+ // it could be the contents of the .note.gnu.build-id field.
+ int64 build_id = 6; // Index into string table
+
+ // The following fields indicate the resolution of symbolic info.
+ bool has_functions = 7;
+ bool has_filenames = 8;
+ bool has_line_numbers = 9;
+ bool has_inline_frames = 10;
+}
+
+// Describes function and line table debug information.
+message Location {
+ // Unique nonzero id for the location. A profile could use
+ // instruction addresses or any integer sequence as ids.
+ uint64 id = 1;
+ // The id of the corresponding profile.Mapping for this location.
+ // It can be unset if the mapping is unknown or not applicable for
+ // this profile type.
+ uint64 mapping_id = 2;
+ // The instruction address for this location, if available. It
+ // should be within [Mapping.memory_start...Mapping.memory_limit]
+ // for the corresponding mapping. A non-leaf address may be in the
+ // middle of a call instruction. It is up to display tools to find
+ // the beginning of the instruction if necessary.
+ uint64 address = 3;
+ // Multiple line indicates this location has inlined functions,
+ // where the last entry represents the caller into which the
+ // preceding entries were inlined.
+ //
+ // E.g., if memcpy() is inlined into printf:
+ // line[0].function_name == "memcpy"
+ // line[1].function_name == "printf"
+ repeated Line line = 4;
+ // Provides an indication that multiple symbols map to this location's
+ // address, for example due to identical code folding by the linker. In that
+ // case the line information above represents one of the multiple
+ // symbols. This field must be recomputed when the symbolization state of the
+ // profile changes.
+ bool is_folded = 5;
+}
+
+message Line {
+ // The id of the corresponding profile.Function for this line.
+ uint64 function_id = 1;
+ // Line number in source code.
+ int64 line = 2;
+}
+
+message Function {
+ // Unique nonzero id for the function.
+ uint64 id = 1;
+ // Name of the function, in human-readable form if available.
+ int64 name = 2; // Index into string table
+ // Name of the function, as identified by the system.
+ // For instance, it can be a C++ mangled name.
+ int64 system_name = 3; // Index into string table
+ // Source file containing the function.
+ int64 filename = 4; // Index into string table
+ // Line number in source file.
+ int64 start_line = 5;
+}
diff --git a/yt/yt/library/ytprof/queue-inl.h b/yt/yt/library/ytprof/queue-inl.h
new file mode 100644
index 00000000000..bc030d52fa0
--- /dev/null
+++ b/yt/yt/library/ytprof/queue-inl.h
@@ -0,0 +1,72 @@
+#ifndef QUEUE_INL_H_
+#error "Direct inclusion of this file is not allowed, include queue.h"
+#include "queue.h"
+#endif
+#undef QUEUE_INL_H_
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TStaticQueue::TStaticQueue(size_t logSize)
+ : Size_(1 << logSize)
+ , Buffer_(Size_)
+{ }
+
+template <class TFn>
+bool TStaticQueue::TryPush(const TFn& getIp)
+{
+ auto start = WriteSeq_.load(std::memory_order::relaxed);
+ auto end = ReadSeq_.load(std::memory_order::acquire) + Size_;
+
+ if (start + 1 >= end) {
+ return false;
+ }
+
+ int count = 0;
+ while (true) {
+ auto [ip, ok] = getIp();
+ if (!ok) {
+ break;
+ }
+
+ if (start + count + 1 >= end) {
+ return false;
+ }
+
+ Buffer_[ToIndex(start+count+1)] = reinterpret_cast<intptr_t>(ip);
+ count++;
+ }
+
+ Buffer_[ToIndex(start)] = count;
+ WriteSeq_.store(start+count+1, std::memory_order::release);
+ return true;
+}
+
+template <class TFn>
+bool TStaticQueue::TryPop(const TFn& onIp)
+{
+ auto start = ReadSeq_.load(std::memory_order::relaxed);
+ auto end = WriteSeq_.load(std::memory_order::acquire);
+
+ if (start == end) {
+ return false;
+ }
+
+ auto count = Buffer_[ToIndex(start)];
+ for (int i = 0; i < count; i++) {
+ onIp(reinterpret_cast<void*>(Buffer_[ToIndex(start+i+1)]));
+ }
+
+ ReadSeq_.store(start+count+1, std::memory_order::release);
+ return true;
+}
+
+size_t TStaticQueue::ToIndex(i64 seq) const
+{
+ return seq & (Size_ - 1);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/queue.h b/yt/yt/library/ytprof/queue.h
new file mode 100644
index 00000000000..ec6617d4131
--- /dev/null
+++ b/yt/yt/library/ytprof/queue.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <atomic>
+#include <optional>
+#include <vector>
+
+#include <util/generic/fwd.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/*!
+ * TStaticQueue is SPSC thread and signal safe queue that does not perform
+ * any allocations.
+ */
+class TStaticQueue
+{
+public:
+ inline TStaticQueue(size_t logSize);
+
+ template <class TFn>
+ bool TryPush(const TFn& getIp);
+
+ template <class TFn>
+ bool TryPop(const TFn& onIp);
+
+private:
+ const i64 Size_;
+
+ std::atomic<i64> WriteSeq_{0};
+ std::atomic<i64> ReadSeq_{0};
+
+ std::vector<intptr_t> Buffer_;
+
+ inline size_t ToIndex(i64 seq) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
+
+#define QUEUE_INL_H_
+#include "queue-inl.h"
+#undef QUEUE_INL_H_
diff --git a/yt/yt/library/ytprof/signal_safe_profiler.cpp b/yt/yt/library/ytprof/signal_safe_profiler.cpp
new file mode 100644
index 00000000000..4312e7527a2
--- /dev/null
+++ b/yt/yt/library/ytprof/signal_safe_profiler.cpp
@@ -0,0 +1,370 @@
+#include "signal_safe_profiler.h"
+#include "symbolize.h"
+
+#if defined(_linux_)
+#include <link.h>
+#include <sys/syscall.h>
+#include <sys/prctl.h>
+
+#include <util/system/yield.h>
+
+#include <csignal>
+#include <functional>
+
+#include <util/generic/yexception.h>
+#endif
+
+#include <yt/yt/core/misc/proc.h>
+
+#include <library/cpp/yt/cpu_clock/clock.h>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TProfileLocation::operator size_t() const
+{
+ size_t hash = Tid;
+ hash = CombineHashes(hash, std::hash<TString>()(ThreadName));
+
+ for (auto ip : Backtrace) {
+ hash = CombineHashes(hash, ip);
+ }
+
+ for (const auto& tag : Tags) {
+ hash = CombineHashes(hash,
+ CombineHashes(
+ std::hash<TString>{}(tag.first),
+ std::hash<std::variant<TString, i64>>{}(tag.second)));
+ }
+
+ return hash;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_WEAK void* AcquireFiberTagStorage()
+{
+ return nullptr;
+}
+
+Y_WEAK std::vector<std::pair<TString, std::variant<TString, i64>>> ReadFiberTags(void* /* storage */)
+{
+ return {};
+}
+
+Y_WEAK void ReleaseFiberTagStorage(void* /* storage */)
+{ }
+
+Y_WEAK TCpuInstant GetTraceContextTimingCheckpoint()
+{
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if not defined(_linux_)
+
+void TSignalSafeProfiler::Start()
+{ }
+
+void TSignalSafeProfiler::Stop()
+{ }
+
+NProto::Profile TSignalSafeProfiler::ReadProfile()
+{
+ return {};
+}
+
+void TSignalSafeProfiler::RecordSample(NBacktrace::TFramePointerCursor* /*cursor*/, i64 /*value*/)
+{ }
+
+void TSignalSafeProfiler::DequeueSamples()
+{ }
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+TSignalSafeProfiler::TSignalSafeProfiler(TSignalSafeProfilerOptions options)
+ : Options_(options)
+ , Queue_(options.RingBufferLogSize)
+{ }
+
+TSignalSafeProfiler::~TSignalSafeProfiler()
+{
+ YT_VERIFY(Stop_);
+}
+
+#if defined(_linux_)
+
+void TSignalSafeProfiler::Start()
+{
+ if (!IsProfileBuild()) {
+ throw yexception() << "frame pointers not available; rebuild with --build=profile";
+ }
+
+ EnableProfiler();
+ Stop_ = false;
+ BackgroundThread_ = std::thread([this] {
+ DequeueSamples();
+ });
+}
+
+void TSignalSafeProfiler::Stop()
+{
+ if (Stop_) {
+ return;
+ }
+
+ Stop_ = true;
+ DisableProfiler();
+ BackgroundThread_.join();
+}
+
+TCpuDuration GetActionRunTime()
+{
+ auto fiberStartTime = GetTraceContextTimingCheckpoint();
+ if (fiberStartTime == 0) {
+ return 0;
+ }
+
+ return GetCpuInstant() - fiberStartTime;
+}
+
+void TSignalSafeProfiler::RecordSample(NBacktrace::TFramePointerCursor* cursor, i64 value)
+{
+ int count = 0;
+ bool pushTid = false;
+ bool pushFiberStorage = false;
+ bool pushValue = false;
+ bool pushActionRunTime = false;
+ int tagIndex = 0;
+
+ auto tagsPtr = GetCpuProfilerTags();
+
+ uintptr_t threadName[2] = {};
+ prctl(PR_GET_NAME, (unsigned long)threadName, 0UL, 0UL, 0UL);
+ int namePushed = 0;
+
+ auto ok = Queue_.TryPush([&] () -> std::pair<const void*, bool> {
+ if (!pushValue) {
+ pushValue = true;
+ return {reinterpret_cast<const void*>(value), true};
+ }
+
+ if (!pushTid) {
+ pushTid = true;
+ return {reinterpret_cast<void*>(GetCurrentThreadId()), true};
+ }
+
+ if (Options_.RecordActionRunTime && !pushActionRunTime) {
+ pushActionRunTime = true;
+ return {reinterpret_cast<const void*>(GetActionRunTime()), true};
+ }
+
+ if (namePushed < 2) {
+ return {reinterpret_cast<const void*>(threadName[namePushed++]), true};
+ }
+
+ if (!pushFiberStorage) {
+ pushFiberStorage = true;
+ return {AcquireFiberTagStorage(), true};
+ }
+
+ if (tagIndex < MaxActiveTags) {
+ if (tagsPtr) {
+ auto tag = (*tagsPtr)[tagIndex].GetFromSignal();
+ tagIndex++;
+ return {reinterpret_cast<const void*>(tag.Release()), true};
+ } else {
+ tagIndex++;
+ return {nullptr, true};
+ }
+ }
+
+ if (count > Options_.MaxBacktraceSize) {
+ return {nullptr, false};
+ }
+
+ if (cursor->IsFinished()) {
+ return {nullptr, false};
+ }
+
+ auto ip = cursor->GetCurrentIP();
+ if (count != 0) {
+ // First IP points to next executing instruction.
+ // All other IP's are return addresses.
+ // Substract 1 to get accurate line information for profiler.
+ ip = reinterpret_cast<const void*>(reinterpret_cast<uintptr_t>(ip) - 1);
+ }
+
+ cursor->MoveNext();
+ count++;
+ return {ip, true};
+ });
+
+ if (!ok) {
+ QueueOverflows_++;
+ }
+}
+
+void TSignalSafeProfiler::DequeueSamples()
+{
+ TProfileLocation sample;
+ while (!Stop_) {
+ Sleep(Options_.DequeuePeriod);
+
+ while (true) {
+ sample.Backtrace.clear();
+ sample.Tags.clear();
+
+ std::optional<i64> value;
+ std::optional<size_t> tid;
+ std::optional<TCpuDuration> actionRunTime;
+ uintptr_t threadName[2] = {};
+ int namePopped = 0;
+ std::optional<void*> fiberStorage;
+
+ int tagIndex = 0;
+ bool ok = Queue_.TryPop([&] (void* ip) {
+ if (!value) {
+ value = reinterpret_cast<i64>(ip);
+ return;
+ }
+
+ if (!tid) {
+ tid = reinterpret_cast<size_t>(ip);
+ return;
+ }
+
+ if (Options_.RecordActionRunTime && !actionRunTime) {
+ actionRunTime = reinterpret_cast<TCpuDuration>(ip);
+ return;
+ }
+
+ if (namePopped < 2) {
+ threadName[namePopped++] = reinterpret_cast<uintptr_t>(ip);
+ return;
+ }
+
+ if (!fiberStorage) {
+ fiberStorage = ip;
+ return;
+ }
+
+ if (tagIndex < MaxActiveTags) {
+ auto tag = reinterpret_cast<TProfilerTag*>(ip);
+ if (tag) {
+ if (tag->StringValue) {
+ sample.Tags.emplace_back(tag->Name, *tag->StringValue);
+ } else {
+ sample.Tags.emplace_back(tag->Name, *tag->IntValue);
+ }
+ }
+ tagIndex++;
+ return;
+ }
+
+ sample.Backtrace.push_back(reinterpret_cast<ui64>(ip));
+ });
+
+ if (!ok) {
+ break;
+ }
+
+ sample.ThreadName = TString{reinterpret_cast<char*>(threadName)};
+ sample.Tid = *tid;
+ for (auto& tag : ReadFiberTags(*fiberStorage)) {
+ sample.Tags.push_back(std::move(tag));
+ }
+ ReleaseFiberTagStorage(*fiberStorage);
+
+ if (Options_.RecordActionRunTime) {
+ sample.Tags.emplace_back(
+ "action_run_time_us",
+ static_cast<i64>(CpuDurationToDuration(*actionRunTime).MicroSeconds()));
+ }
+
+ auto& counter = Counters_[sample];
+ counter.Count++;
+ counter.Total += *value;
+ }
+ }
+}
+
+NProto::Profile TSignalSafeProfiler::ReadProfile()
+{
+ NProto::Profile profile;
+ profile.add_string_table();
+
+ THashMap<TString, ui64> stringTable;
+ auto stringify = [&] (const TString& str) -> i64 {
+ if (auto it = stringTable.find(str); it != stringTable.end()) {
+ return it->second;
+ } else {
+ auto nameId = profile.string_table_size();
+ profile.add_string_table(str);
+ stringTable[str] = nameId;
+ return nameId;
+ }
+ };
+
+ AnnotateProfile(&profile, stringify);
+
+ THashMap<uintptr_t, ui64> locations;
+ for (const auto& [backtrace, counters] : Counters_) {
+ auto sample = profile.add_sample();
+
+ sample->add_value(counters.Count);
+ sample->add_value(EncodeValue(counters.Total));
+
+ auto label = sample->add_label();
+ label->set_key(stringify("tid"));
+ label->set_num(backtrace.Tid);
+
+ label = sample->add_label();
+ label->set_key(stringify("thread"));
+ label->set_str(stringify(backtrace.ThreadName));
+
+ for (auto tag : backtrace.Tags) {
+ auto label = sample->add_label();
+ label->set_key(stringify(tag.first));
+
+ if (auto intValue = std::get_if<i64>(&tag.second)) {
+ label->set_num(*intValue);
+ } else if (auto strValue = std::get_if<TString>(&tag.second)) {
+ label->set_str(stringify(*strValue));
+ }
+ }
+
+ for (auto ip : backtrace.Backtrace) {
+ auto it = locations.find(ip);
+ if (it != locations.end()) {
+ sample->add_location_id(it->second);
+ continue;
+ }
+
+ auto locationId = locations.size() + 1;
+
+ auto location = profile.add_location();
+ location->set_address(ip);
+ location->set_id(locationId);
+
+ sample->add_location_id(locationId);
+ locations[ip] = locationId;
+ }
+ }
+
+ if (QueueOverflows_ > 0) {
+ profile.add_comment(stringify("ytprof.queue_overflows=" + std::to_string(QueueOverflows_)));
+ }
+
+ return profile;
+}
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/signal_safe_profiler.h b/yt/yt/library/ytprof/signal_safe_profiler.h
new file mode 100644
index 00000000000..d09ba34dd3d
--- /dev/null
+++ b/yt/yt/library/ytprof/signal_safe_profiler.h
@@ -0,0 +1,95 @@
+#pragma once
+
+#include "queue.h"
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+#include <yt/yt/library/ytprof/api/api.h>
+
+#include <library/cpp/yt/memory/intrusive_ptr.h>
+#include <library/cpp/yt/memory/safe_memory_reader.h>
+
+#include <library/cpp/yt/backtrace/cursors/frame_pointer/frame_pointer_cursor.h>
+
+#include <util/generic/hash.h>
+
+#include <util/datetime/base.h>
+
+#include <thread>
+#include <array>
+#include <variant>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TProfileLocation
+{
+ size_t Tid = 0;
+ TString ThreadName;
+ std::vector<std::pair<TString, std::variant<TString, i64>>> Tags;
+ std::vector<ui64> Backtrace;
+
+ bool operator == (const TProfileLocation& other) const = default;
+ operator size_t() const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSignalSafeProfilerOptions
+{
+public:
+ TDuration DequeuePeriod = TDuration::MilliSeconds(100);
+ int MaxBacktraceSize = 256;
+ int RingBufferLogSize = 20; // 1 MiB
+ bool RecordActionRunTime = false;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSignalSafeProfiler
+{
+public:
+ explicit TSignalSafeProfiler(TSignalSafeProfilerOptions options = {});
+ virtual ~TSignalSafeProfiler();
+
+ void Start();
+ void Stop();
+
+ //! ReadProfile returns accumulated profile.
+ /*!
+ * This function may be called only after profiler is stopped.
+ */
+ NProto::Profile ReadProfile();
+
+protected:
+ const TSignalSafeProfilerOptions Options_;
+
+ TSafeMemoryReader Reader_;
+
+ std::atomic<bool> Stop_ = true;
+ std::atomic<i64> QueueOverflows_ = 0;
+
+ TStaticQueue Queue_;
+ std::thread BackgroundThread_;
+
+ struct TProfileCounter
+ {
+ i64 Count = 0;
+ i64 Total = 0;
+ };
+
+ THashMap<TProfileLocation, TProfileCounter> Counters_;
+
+ virtual void EnableProfiler() = 0;
+ virtual void DisableProfiler() = 0;
+ virtual void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) = 0;
+ virtual i64 EncodeValue(i64 value) = 0;
+
+ void RecordSample(NBacktrace::TFramePointerCursor* cursor, i64 value);
+
+ void DequeueSamples();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/spinlock_profiler.cpp b/yt/yt/library/ytprof/spinlock_profiler.cpp
new file mode 100644
index 00000000000..a9b3b6b2482
--- /dev/null
+++ b/yt/yt/library/ytprof/spinlock_profiler.cpp
@@ -0,0 +1,225 @@
+#include "spinlock_profiler.h"
+
+#include <library/cpp/yt/backtrace/cursors/interop/interop.h>
+
+#include <absl/base/internal/spinlock.h>
+#include <absl/base/internal/cycleclock.h>
+
+#include <util/system/yield.h>
+
+#include <mutex>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TSpinlockProfiler::TSpinlockProfiler(TSpinlockProfilerOptions options)
+ : TSignalSafeProfiler(options)
+ , Options_(options)
+{ }
+
+TSpinlockProfiler::~TSpinlockProfiler()
+{
+ Stop();
+}
+
+std::atomic<int> TSpinlockProfiler::SamplingRate_;
+std::atomic<TSpinlockProfiler*> TSpinlockProfiler::ActiveProfiler_;
+std::atomic<bool> TSpinlockProfiler::HandlingEvent_;
+std::once_flag TSpinlockProfiler::HookInitialized_;
+
+void TSpinlockProfiler::EnableProfiler()
+{
+ std::call_once(HookInitialized_, [] {
+ absl::base_internal::RegisterSpinLockProfiler(&TSpinlockProfiler::OnEvent);
+ return true;
+ });
+
+ TSpinlockProfiler* expected = nullptr;
+ if (!ActiveProfiler_.compare_exchange_strong(expected, this)) {
+ throw yexception() << "Another instance of spinlock profiler is running";
+ }
+ SamplingRate_ = Options_.ProfileFraction;
+}
+
+void TSpinlockProfiler::DisableProfiler()
+{
+ SamplingRate_ = 0;
+ ActiveProfiler_ = nullptr;
+ while (HandlingEvent_) {
+ SchedYield();
+ }
+}
+
+void TSpinlockProfiler::RecordEvent(const void* /*lock*/, int64_t waitCycles)
+{
+ unw_context_t uwContext;
+ YT_VERIFY(unw_getcontext(&uwContext) == 0);
+
+ unw_cursor_t unwCursor;
+ YT_VERIFY(unw_init_local(&unwCursor, &uwContext) == 0);
+
+ auto fpCursorContext = NBacktrace::FramePointerCursorContextFromLibunwindCursor(unwCursor);
+ NBacktrace::TFramePointerCursor fpCursor(&Reader_, fpCursorContext);
+ RecordSample(&fpCursor, waitCycles);
+}
+
+static thread_local int SpinlockEventCount;
+
+void TSpinlockProfiler::OnEvent(const void* lock, int64_t waitCycles)
+{
+ auto samplingRate = SamplingRate_.load(std::memory_order::relaxed);
+ if (samplingRate == 0) {
+ return;
+ }
+
+ if (SpinlockEventCount < samplingRate) {
+ SpinlockEventCount++;
+ return;
+ }
+
+ SpinlockEventCount = 0;
+ while (HandlingEvent_.exchange(true)) {
+ SchedYield();
+ }
+
+ if (auto profiler = ActiveProfiler_.load(); profiler) {
+ profiler->RecordEvent(lock, waitCycles);
+ }
+
+ HandlingEvent_.store(false);
+}
+
+void TSpinlockProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify)
+{
+ auto sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("sample"));
+ sampleType->set_unit(stringify("count"));
+
+ sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("cpu"));
+ sampleType->set_unit(stringify("nanoseconds"));
+
+ auto periodType = profile->mutable_period_type();
+ periodType->set_type(stringify("sample"));
+ periodType->set_unit(stringify("count"));
+
+ profile->set_period(Options_.ProfileFraction);
+}
+
+i64 TSpinlockProfiler::EncodeValue(i64 value)
+{
+ return value / absl::base_internal::CycleClock::Frequency() * 1e9;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+TBlockingProfiler::TBlockingProfiler(TSpinlockProfilerOptions options)
+ : TSignalSafeProfiler(options)
+ , Options_(options)
+{ }
+
+TBlockingProfiler::~TBlockingProfiler()
+{
+ Stop();
+}
+
+std::atomic<int> TBlockingProfiler::SamplingRate_;
+std::atomic<TBlockingProfiler*> TBlockingProfiler::ActiveProfiler_;
+std::atomic<bool> TBlockingProfiler::HandlingEvent_;
+std::once_flag TBlockingProfiler::HookInitialized_;
+
+void TBlockingProfiler::EnableProfiler()
+{
+ std::call_once(HookInitialized_, [] {
+ NThreading::RegisterSpinWaitSlowPathHook(&TBlockingProfiler::OnEvent);
+ return true;
+ });
+
+ TBlockingProfiler* expected = nullptr;
+ if (!ActiveProfiler_.compare_exchange_strong(expected, this)) {
+ throw yexception() << "Another instance of spinlock profiler is running";
+ }
+ SamplingRate_ = Options_.ProfileFraction;
+}
+
+void TBlockingProfiler::DisableProfiler()
+{
+ SamplingRate_ = 0;
+ ActiveProfiler_ = nullptr;
+ while (HandlingEvent_) {
+ SchedYield();
+ }
+}
+
+void TBlockingProfiler::RecordEvent(
+ TCpuDuration cpuDelay,
+ const ::TSourceLocation& /*location*/,
+ NThreading::ESpinLockActivityKind /*activityKind*/)
+{
+ unw_context_t unwContext;
+ YT_VERIFY(unw_getcontext(&unwContext) == 0);
+
+ unw_cursor_t unwCursor;
+ YT_VERIFY(unw_init_local(&unwCursor, &unwContext) == 0);
+
+ auto fpCursorContext = NBacktrace::FramePointerCursorContextFromLibunwindCursor(unwCursor);
+ NBacktrace::TFramePointerCursor fpCursor(&Reader_, fpCursorContext);
+ RecordSample(&fpCursor, cpuDelay);
+}
+
+static thread_local int YTSpinlockEventCount;
+
+void TBlockingProfiler::OnEvent(
+ TCpuDuration cpuDelay,
+ const ::TSourceLocation& location,
+ NThreading::ESpinLockActivityKind activityKind)
+{
+ auto samplingRate = SamplingRate_.load(std::memory_order::relaxed);
+ if (samplingRate == 0) {
+ return;
+ }
+
+ if (YTSpinlockEventCount < samplingRate) {
+ YTSpinlockEventCount++;
+ return;
+ }
+
+ YTSpinlockEventCount = 0;
+ while (HandlingEvent_.exchange(true)) {
+ SchedYield();
+ }
+
+ if (auto profiler = ActiveProfiler_.load(); profiler) {
+ profiler->RecordEvent(cpuDelay, location, activityKind);
+ }
+
+ HandlingEvent_.store(false);
+}
+
+void TBlockingProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify)
+{
+ auto sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("sample"));
+ sampleType->set_unit(stringify("count"));
+
+ sampleType = profile->add_sample_type();
+ sampleType->set_type(stringify("cpu"));
+ sampleType->set_unit(stringify("nanoseconds"));
+
+ auto periodType = profile->mutable_period_type();
+ periodType->set_type(stringify("sample"));
+ periodType->set_unit(stringify("count"));
+
+ profile->set_period(Options_.ProfileFraction);
+}
+
+i64 TBlockingProfiler::EncodeValue(i64 value)
+{
+ return CpuDurationToDuration(value).NanoSeconds();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/spinlock_profiler.h b/yt/yt/library/ytprof/spinlock_profiler.h
new file mode 100644
index 00000000000..6df576c3b7a
--- /dev/null
+++ b/yt/yt/library/ytprof/spinlock_profiler.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#include "signal_safe_profiler.h"
+
+#include <library/cpp/yt/threading/spin_wait_hook.h>
+
+#include <mutex>
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSpinlockProfilerOptions
+ : public TSignalSafeProfilerOptions
+{
+ int ProfileFraction = 100;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// TSpinlockProfiler profiles wait events from absl spinlock.
+class TSpinlockProfiler
+ : public TSignalSafeProfiler
+{
+public:
+ explicit TSpinlockProfiler(TSpinlockProfilerOptions options);
+ ~TSpinlockProfiler();
+
+private:
+ const TSpinlockProfilerOptions Options_;
+
+ static std::atomic<int> SamplingRate_;
+ static std::atomic<TSpinlockProfiler*> ActiveProfiler_;
+ static std::atomic<bool> HandlingEvent_;
+ static std::once_flag HookInitialized_;
+
+ void EnableProfiler() override;
+ void DisableProfiler() override;
+ void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override;
+ i64 EncodeValue(i64 value) override;
+
+ static void OnEvent(const void *lock, int64_t waitCycles);
+ void RecordEvent(const void *lock, int64_t waitCycles);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// TBlockingProfiler profiles wait events from yt spinlocks.
+class TBlockingProfiler
+ : public TSignalSafeProfiler
+{
+public:
+ explicit TBlockingProfiler(TSpinlockProfilerOptions options);
+ ~TBlockingProfiler();
+
+private:
+ const TSpinlockProfilerOptions Options_;
+
+ static std::atomic<int> SamplingRate_;
+ static std::atomic<TBlockingProfiler*> ActiveProfiler_;
+ static std::atomic<bool> HandlingEvent_;
+ static std::once_flag HookInitialized_;
+
+ void EnableProfiler() override;
+ void DisableProfiler() override;
+ void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override;
+ i64 EncodeValue(i64 value) override;
+
+ static void OnEvent(
+ TCpuDuration cpuDelay,
+ const ::TSourceLocation& location,
+ NThreading::ESpinLockActivityKind activityKind);
+
+ void RecordEvent(
+ TCpuDuration cpuDelay,
+ const ::TSourceLocation& location,
+ NThreading::ESpinLockActivityKind activityKind);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/symbolize.cpp b/yt/yt/library/ytprof/symbolize.cpp
new file mode 100644
index 00000000000..553a34c4e34
--- /dev/null
+++ b/yt/yt/library/ytprof/symbolize.cpp
@@ -0,0 +1,809 @@
+#include "symbolize.h"
+
+#include <library/cpp/yt/assert/assert.h>
+
+#include <util/folder/path.h>
+
+#include <util/generic/yexception.h>
+#include <util/generic/hash.h>
+
+#include <util/string/printf.h>
+
+#include <util/system/filemap.h>
+#include <util/system/type_name.h>
+#include <util/system/unaligned_mem.h>
+
+#include <dlfcn.h>
+#include <link.h>
+#include <elf.h>
+
+#ifndef YT_NO_AUXV
+#include <sys/auxv.h>
+#endif
+
+#include <exception>
+#include <cxxabi.h>
+
+namespace NYT::NYTProf {
+
+class TDLAddrSymbolizer
+{
+public:
+ explicit TDLAddrSymbolizer(NProto::Profile* profile)
+ : Profile_(profile)
+ { }
+
+ void Symbolize()
+ {
+ for (int i = 0; i < Profile_->function_size(); i++) {
+ auto function = Profile_->mutable_function(i);
+
+ void* ip = reinterpret_cast<void*>(function->id());
+
+ Dl_info dlinfo{};
+ if (dladdr(ip, &dlinfo) == 0) {
+ continue;
+ }
+
+ TString name;
+ TString demangledName;
+ if (!dlinfo.dli_sname) {
+ auto offset = reinterpret_cast<intptr_t>(ip) - reinterpret_cast<intptr_t>(dlinfo.dli_fbase);
+ auto filename = TFsPath{dlinfo.dli_fname}.Basename();
+ name = Sprintf("%p", reinterpret_cast<void*>(offset)) + "@" + filename;
+ demangledName = name;
+ } else {
+ name = dlinfo.dli_sname;
+ demangledName = CppDemangle(dlinfo.dli_sname);
+ }
+
+ function->set_name(SymbolizeString(demangledName));
+ function->set_system_name(SymbolizeString(name));
+ }
+ }
+
+private:
+ NProto::Profile* const Profile_;
+
+ THashMap<TString, ui64> Strings_;
+
+ ui64 SymbolizeString(const TString& str)
+ {
+ auto it = Strings_.find(str);
+ if (it != Strings_.end()) {
+ return it->second;
+ }
+
+ auto id = Profile_->string_table_size();
+ Strings_[str] = id;
+ Profile_->add_string_table(str);
+ return id;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+using TElfAddr = ElfW(Addr);
+using TElfDyn = ElfW(Dyn);
+using TElfEhdr = ElfW(Ehdr);
+using TElfOff = ElfW(Off);
+using TElfPhdr = ElfW(Phdr);
+using TElfShdr = ElfW(Shdr);
+using TElfNhdr = ElfW(Nhdr);
+using TElfSym = ElfW(Sym);
+using TElfWord = ElfW(Word);
+
+class TElf final
+{
+public:
+ class TSection
+ {
+ public:
+ const TElfShdr& Header;
+
+ const char* Name() const
+ {
+ if (!Elf_.SectionNames_) {
+ throw yexception() << "Section names are not initialized";
+ }
+
+ if (Header.sh_name > Elf_.SectionNamesSize_) {
+ throw yexception() << "Section name point outside of strings table";
+ }
+
+ return Elf_.SectionNames_ + Header.sh_name;
+ }
+
+ const char* begin() const
+ {
+ return Elf_.Mapped_ + Header.sh_offset;
+ }
+
+ const char* end() const
+ {
+ return begin() + size();
+ }
+
+ size_t size() const
+ {
+ return Header.sh_size;
+ }
+
+ TSection(const TElfShdr& header, const TElf& elf)
+ : Header(header)
+ , Elf_(elf)
+ { }
+
+ private:
+ const TElf& Elf_;
+ };
+
+ explicit TElf(const TString& path)
+ : FileMap_(path)
+ {
+ FileMap_.Map(0, FileMap_.GetFile().GetLength());
+
+ ElfSize_ = FileMap_.MappedSize();
+ Mapped_ = reinterpret_cast<const char*>(FileMap_.Ptr());
+
+ if (ElfSize_ < sizeof(TElfEhdr)) {
+ throw yexception() << "The size of ELF file is too small: " << ElfSize_;
+ }
+
+ Header_ = reinterpret_cast<const TElfEhdr*>(Mapped_);
+
+ if (memcmp(Header_->e_ident, "\x7F""ELF", 4) != 0) {
+ throw yexception() << "The file is not ELF according to magic";
+ }
+
+ TElfOff sectionHeaderOffset = Header_->e_shoff;
+ uint16_t sectionHeaderNumEntries = Header_->e_shnum;
+
+ if (!sectionHeaderOffset ||
+ !sectionHeaderNumEntries ||
+ sectionHeaderOffset + sectionHeaderNumEntries * sizeof(TElfShdr) > ElfSize_)
+ {
+ throw yexception() << "The ELF is truncated (section header points after end of file)";
+ }
+
+ SectionHeaders_ = reinterpret_cast<const TElfShdr*>(Mapped_ + sectionHeaderOffset);
+
+ auto sectionStrtab = FindSection([&] (const TSection& section, size_t idx) {
+ return section.Header.sh_type == SHT_STRTAB && Header_->e_shstrndx == idx;
+ });
+
+ if (!sectionStrtab) {
+ throw yexception() << "The ELF doesn't have string table with section names";
+ }
+
+ TElfOff sectionNamesOffset = sectionStrtab->Header.sh_offset;
+ if (sectionNamesOffset >= ElfSize_) {
+ throw yexception() << "The ELF is truncated (section names string table points after end of file)";
+ }
+ if (sectionNamesOffset + SectionNamesSize_ > ElfSize_) {
+ throw yexception() << "The ELF is truncated (section names string table is truncated)";
+ }
+
+ SectionNames_ = reinterpret_cast<const char *>(Mapped_ + sectionNamesOffset);
+ SectionNamesSize_ = sectionStrtab->Header.sh_offset;
+
+ TElfOff programHeaderOffset = Header_->e_phoff;
+ uint16_t programHeaderNumEntries = Header_->e_phnum;
+
+ if (!programHeaderOffset ||
+ !programHeaderNumEntries ||
+ programHeaderOffset + programHeaderNumEntries * sizeof(TElfPhdr) > ElfSize_
+ ) {
+ throw yexception() << "The ELF is truncated (program header points after end of file)";
+ }
+
+ ProgramHeaders_ = reinterpret_cast<const TElfPhdr*>(Mapped_ + programHeaderOffset);
+ }
+
+ bool IterateSections(std::function<bool(const TSection& section, size_t idx)> pred) const
+ {
+ for (size_t idx = 0; idx < Header_->e_shnum; ++idx) {
+ TSection section(SectionHeaders_[idx], *this);
+
+ if (section.Header.sh_offset + section.Header.sh_size > ElfSize_) {
+ continue;
+ }
+
+ if (pred(section, idx)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ std::optional<TSection> FindSection(std::function<bool(const TSection& section, size_t idx)> pred) const
+ {
+ std::optional<TSection> result;
+
+ IterateSections([&] (const TSection& section, size_t idx) {
+ if (pred(section, idx)) {
+ result.emplace(section);
+ return true;
+ }
+ return false;
+ });
+
+ return result;
+ }
+
+ std::optional<TSection> FindSectionByName(const char* name) const
+ {
+ return FindSection([&] (const TSection& section, size_t) { return 0 == strcmp(name, section.Name()); });
+ }
+
+ const char* begin() const { return Mapped_; }
+ const char* end() const { return Mapped_ + ElfSize_; }
+ size_t size() const { return ElfSize_; }
+
+ TString GetBuildId() const
+ {
+ for (size_t idx = 0; idx < Header_->e_phnum; ++idx) {
+ const TElfPhdr& phdr = ProgramHeaders_[idx];
+
+ if (phdr.p_type == PT_NOTE) {
+ return GetBuildId(Mapped_ + phdr.p_offset, phdr.p_filesz);
+ }
+ }
+
+ return {};
+ }
+
+ static TString GetBuildId(const char* nhdrPos, size_t nhdrSize)
+ {
+ const char* nhdrEnd = nhdrPos + nhdrSize;
+
+ while (nhdrPos < nhdrEnd) {
+ TElfNhdr nhdr = ReadUnaligned<TElfNhdr>(nhdrPos);
+
+ nhdrPos += sizeof(TElfNhdr) + nhdr.n_namesz;
+ if (nhdr.n_type == NT_GNU_BUILD_ID) {
+ const char* build_id = nhdrPos;
+ return {build_id, nhdr.n_descsz};
+ }
+ nhdrPos += nhdr.n_descsz;
+ }
+
+ return {};
+ }
+
+private:
+ TFileMap FileMap_;
+
+ size_t ElfSize_ = 0;
+ const char* Mapped_ = nullptr;
+ const TElfEhdr* Header_;
+ const TElfShdr* SectionHeaders_;
+ const TElfPhdr* ProgramHeaders_;
+
+ const char* SectionNames_ = nullptr;
+ size_t SectionNamesSize_ = 0;
+};
+
+using TElfPtr = std::shared_ptr<TElf>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::optional<std::pair<void*, void*>> GetExecutableRange(dl_phdr_info* info)
+{
+ const TElfPhdr* load = nullptr;
+ for (int i = 0; i < info->dlpi_phnum; i++) {
+ load = &(info->dlpi_phdr[i]);
+ if (load->p_type == PT_LOAD && (load->p_flags & PF_X) != 0) {
+ break;
+ }
+ }
+
+ if (!load) {
+ return {};
+ }
+
+ return std::pair{
+ reinterpret_cast<void*>(info->dlpi_addr + load->p_vaddr),
+ reinterpret_cast<void*>(info->dlpi_addr + load->p_vaddr + load->p_memsz)
+ };
+}
+
+#if defined(_msan_enabled_)
+extern "C" void __msan_unpoison_string(const volatile void* a);
+#endif
+
+class TSymbolIndex
+{
+public:
+ TSymbolIndex()
+ {
+ dl_iterate_phdr(CollectSymbols, this);
+
+ std::sort(Objects_.begin(), Objects_.end(), [] (const TObject& a, const TObject& b) { return a.AddressBegin < b.AddressBegin; });
+ std::sort(Symbols_.begin(), Symbols_.end(), [] (const TSymbol& a, const TSymbol& b) { return a.AddressBegin < b.AddressBegin; });
+
+ /// We found symbols both from loaded program headers and from ELF symbol tables.
+ Symbols_.erase(std::unique(Symbols_.begin(), Symbols_.end(), [] (const TSymbol &a, const TSymbol& b) {
+ return a.AddressBegin == b.AddressBegin && a.AddressEnd == b.AddressEnd;
+ }), Symbols_.end());
+ }
+
+ static int CollectSymbols(dl_phdr_info* info, size_t, void* ptr)
+ {
+ TSymbolIndex* symbolIndex = reinterpret_cast<TSymbolIndex*>(ptr);
+
+ symbolIndex->CollectSymbolsFromProgramHeaders(info);
+ symbolIndex->CollectSymbolsFromElf(info);
+
+ /* Continue iterations */
+ return 0;
+ }
+
+ struct TSymbol
+ {
+ const void* AddressBegin;
+ const void* AddressEnd;
+ const char* Name;
+ };
+
+ struct TObject
+ {
+ const void* AddressBegin;
+ const void* AddressEnd;
+
+ TString Name;
+ TString BuildId;
+
+ TElfPtr Elf;
+ };
+
+ const TSymbol* FindSymbol(const void* address) const
+ {
+ return Find(address, Symbols_);
+ }
+
+ const TObject* FindObject(const void* address) const
+ {
+ return Find(address, Objects_);
+ }
+
+ const std::vector<TSymbol>& Symbols() const
+ {
+ return Symbols_;
+ }
+
+ const std::vector<TObject>& Objects() const
+ {
+ return Objects_;
+ }
+
+ static TString GetBuildId(dl_phdr_info* info)
+ {
+ for (size_t header_index = 0; header_index < info->dlpi_phnum; ++header_index) {
+ const auto& phdr = info->dlpi_phdr[header_index];
+ if (phdr.p_type != PT_NOTE)
+ continue;
+
+ return TElf::GetBuildId(reinterpret_cast<const char *>(info->dlpi_addr + phdr.p_vaddr), phdr.p_memsz);
+ }
+ return {};
+ }
+
+private:
+ std::vector<TSymbol> Symbols_;
+ std::vector<TObject> Objects_;
+
+ THashMap<TString, TElfPtr> ObjectNameToElf_;
+
+ template <typename T>
+ static const T* Find(const void* address, const std::vector<T>& vec)
+ {
+ auto it = std::lower_bound(vec.begin(), vec.end(), address, [] (const T& symbol, const void* addr) {
+ return symbol.AddressBegin <= addr;
+ });
+
+ if (it == vec.begin()) {
+ return nullptr;
+ } else {
+ --it; /// Last range that has left boundary less or equals than address.
+ }
+
+ if (address >= it->AddressBegin && address < it->AddressEnd) {
+ return &*it;
+ } else {
+ return nullptr;
+ }
+ }
+
+ void CollectSymbolsFromProgramHeaders(dl_phdr_info* info)
+ {
+ /* Iterate over all headers of the current shared lib
+ * (first call is for the executable itself)
+ */
+ for (size_t headerIndex = 0; headerIndex < info->dlpi_phnum; ++headerIndex) {
+ /* Further processing is only needed if the dynamic section is reached
+ */
+ if (info->dlpi_phdr[headerIndex].p_type != PT_DYNAMIC)
+ continue;
+
+ /* Get a pointer to the first entry of the dynamic section.
+ * It's address is the shared lib's address + the virtual address
+ */
+ const auto* dynBegin = reinterpret_cast<const TElfDyn*>(info->dlpi_addr + info->dlpi_phdr[headerIndex].p_vaddr);
+
+ /// For unknown reason, addresses are sometimes relative sometimes absolute.
+ auto correctAddress = [] (TElfAddr base, TElfAddr ptr) {
+ return ptr > base ? ptr : base + ptr;
+ };
+
+ /* Iterate over all entries of the dynamic section until the
+ * end of the symbol table is reached. This is indicated by
+ * an entry with d_tag == DT_NULL.
+ */
+
+ size_t symCnt = 0;
+ for (const auto * it = dynBegin; it->d_tag != DT_NULL; ++it) {
+ if (it->d_tag == DT_GNU_HASH) {
+ /// This code based on Musl-libc.
+
+ const uint32_t* buckets = nullptr;
+ const uint32_t* hashval = nullptr;
+
+ const auto* hash = reinterpret_cast<const TElfWord*>(correctAddress(info->dlpi_addr, it->d_un.d_ptr));
+
+ buckets = hash + 4 + (hash[2] * sizeof(size_t) / 4);
+
+ for (TElfWord index = 0; index < hash[0]; ++index) {
+ if (buckets[index] > symCnt) {
+ symCnt = buckets[index];
+ }
+ }
+
+ if (symCnt) {
+ symCnt -= hash[1];
+ hashval = buckets + hash[0] + symCnt;
+ do {
+ ++symCnt;
+ } while (!(*hashval++ & 1));
+ }
+
+ break;
+ }
+ }
+
+ if (!symCnt) {
+ continue;
+ }
+
+ const char* strtab = nullptr;
+ for (const auto* it = dynBegin; it->d_tag != DT_NULL; ++it) {
+ if (it->d_tag == DT_STRTAB) {
+ strtab = reinterpret_cast<const char *>(correctAddress(info->dlpi_addr, it->d_un.d_ptr));
+ break;
+ }
+ }
+
+ if (!strtab) {
+ continue;
+ }
+
+ for (const auto* it = dynBegin; it->d_tag != DT_NULL; ++it) {
+ if (it->d_tag == DT_SYMTAB) {
+ /* Get the pointer to the first entry of the symbol table */
+ const auto* elfSym = reinterpret_cast<const TElfSym*>(correctAddress(info->dlpi_addr, it->d_un.d_ptr));
+
+ /* Iterate over the symbol table */
+ for (TElfWord symIndex = 0; symIndex < static_cast<TElfWord>(symCnt); ++symIndex) {
+ /// We are not interested in empty symbols.
+ if (!elfSym[symIndex].st_size) {
+ continue;
+ }
+
+ /* Get the name of the sym_index-th symbol.
+ * This is located at the address of st_name relative to the beginning of the string table.
+ */
+ const auto* symName = &strtab[elfSym[symIndex].st_name];
+ if (!symName) {
+ continue;
+ }
+
+ Symbols_.push_back(TSymbol{
+ .AddressBegin = reinterpret_cast<const void *>(info->dlpi_addr + elfSym[symIndex].st_value),
+ .AddressEnd = reinterpret_cast<const void *>(info->dlpi_addr + elfSym[symIndex].st_value + elfSym[symIndex].st_size),
+ .Name = symName,
+ });
+ }
+
+ break;
+ }
+ }
+ }
+ }
+
+ void CollectSymbolsFromElf(dl_phdr_info* info)
+ {
+ /// MSan does not know that the program segments in memory are initialized.
+#if defined(_msan_enabled_)
+ __msan_unpoison_string(info->dlpi_name);
+#endif
+
+ TString objectName = info->dlpi_name;
+ auto buildId = GetBuildId(info);
+
+ /// If the name is empty and there is a non-empty build-id - it's main executable.
+ /// Find a elf file for the main executable and set the build-id.
+ if (objectName.empty()) {
+ objectName = TFsPath{"/proc/self/exe"}.ReadLink();
+ } else {
+ TFsPath debugInfoPath = TFsPath("/usr/lib/debug") / TFsPath{objectName}.Basename();
+ if (debugInfoPath.Exists()) {
+ objectName = debugInfoPath;
+ }
+ }
+
+ auto range = GetExecutableRange(info);
+ if (!range) {
+ return;
+ }
+
+ TObject object {
+ .AddressBegin = range->first,
+ .AddressEnd = range->second,
+ .Name = objectName,
+ .BuildId = buildId,
+ };
+
+ TElfPtr elf;
+ if (auto it = ObjectNameToElf_.find(objectName); it != ObjectNameToElf_.end()) {
+ elf = it->second;
+ } else {
+ if (TFsPath{objectName}.Exists()) {
+ elf = std::make_shared<TElf>(objectName);
+ if (elf->GetBuildId() != buildId) {
+ elf.reset();
+ }
+ }
+
+ YT_VERIFY(ObjectNameToElf_.emplace(objectName, elf).second);
+ }
+
+ object.Elf = elf;
+
+ if (elf) {
+ SearchAndCollectSymbolsFromELFSymbolTable(info, *elf, SHT_SYMTAB, ".strtab");
+ }
+
+ Objects_.push_back(std::move(object));
+ }
+
+ bool SearchAndCollectSymbolsFromELFSymbolTable(
+ dl_phdr_info* info,
+ const TElf& elf,
+ unsigned sectionHeaderType,
+ const char* stringTableName)
+ {
+ std::optional<TElf::TSection> symbolTable;
+ std::optional<TElf::TSection> stringTable;
+
+ if (!elf.IterateSections([&] (const TElf::TSection& section, size_t) {
+ if (section.Header.sh_type == sectionHeaderType) {
+ symbolTable.emplace(section);
+ } else if (section.Header.sh_type == SHT_STRTAB && 0 == strcmp(section.Name(), stringTableName)) {
+ stringTable.emplace(section);
+ }
+
+ return (symbolTable && stringTable);
+ })) {
+ return false;
+ }
+
+ CollectSymbolsFromELFSymbolTable(info, elf, *symbolTable, *stringTable);
+ return true;
+ }
+
+ void CollectSymbolsFromELFSymbolTable(
+ dl_phdr_info* info,
+ const TElf& elf,
+ const TElf::TSection& symbol_table,
+ const TElf::TSection& string_table)
+ {
+ const TElfSym* symbolTableEntry = reinterpret_cast<const TElfSym*>(symbol_table.begin());
+ const TElfSym* symbolTableEnd = reinterpret_cast<const TElfSym*>(symbol_table.end());
+
+ const char* strings = string_table.begin();
+ for (; symbolTableEntry < symbolTableEnd; ++symbolTableEntry) {
+ if (!symbolTableEntry->st_name ||
+ !symbolTableEntry->st_value ||
+ !symbolTableEntry->st_size ||
+ strings + symbolTableEntry->st_name >= elf.end())
+ {
+ continue;
+ }
+
+ /// Find the name in strings table.
+ const auto* symbolName = strings + symbolTableEntry->st_name;
+ if (!symbolName) {
+ continue;
+ }
+
+ Symbols_.push_back(TSymbolIndex::TSymbol{
+ .AddressBegin = reinterpret_cast<const void*>(info->dlpi_addr + symbolTableEntry->st_value),
+ .AddressEnd = reinterpret_cast<const void *>(info->dlpi_addr + symbolTableEntry->st_value + symbolTableEntry->st_size),
+ .Name = symbolName,
+ });
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString GetVersion()
+{
+ return "0.3";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSymbolIndexSymbolizer
+{
+public:
+ explicit TSymbolIndexSymbolizer(NProto::Profile* profile)
+ : Profile_(profile)
+ { }
+
+ void Symbolize(bool filesOnly)
+ {
+ for (const auto& object : SymbolIndex_.Objects()) {
+ SymbolizeObject(&object);
+ }
+
+ for (int index = 0; index < Profile_->location_size(); index++) {
+ auto location = Profile_->mutable_location(index);
+ void* ip = reinterpret_cast<void*>(location->address());
+
+ if (auto object = SymbolIndex_.FindObject(ip)) {
+ location->set_mapping_id(SymbolizeObject(object));
+ }
+ }
+
+ Profile_->add_comment(SymbolizeString("generated by ytprof " + GetVersion()));
+
+ if (filesOnly) {
+ return;
+ }
+
+ for (int index = 0; index < Profile_->function_size(); index++) {
+ auto function = Profile_->mutable_function(index);
+
+ void* ip = reinterpret_cast<void*>(function->id());
+
+ if (auto symbol = SymbolIndex_.FindSymbol(ip)) {
+ function->set_name(SymbolizeString(CppDemangle(symbol->Name)));
+ function->set_system_name(SymbolizeString(symbol->Name));
+ } else if (auto object = SymbolIndex_.FindObject(ip)) {
+ auto offset = reinterpret_cast<intptr_t>(ip) - reinterpret_cast<intptr_t>(object->AddressBegin);
+ auto filename = TFsPath{object->Name}.Basename();
+ auto name = Sprintf("%p", reinterpret_cast<void*>(offset)) + "@" + filename;
+
+ function->set_name(SymbolizeString(name));
+ function->set_system_name(SymbolizeString(name));
+ }
+ }
+ }
+
+private:
+ NProto::Profile* const Profile_;
+
+ TSymbolIndex SymbolIndex_;
+
+ THashMap<const TSymbolIndex::TObject*, ui64> Objects_;
+ THashMap<TString, ui64> Strings_;
+
+ ui64 SymbolizeString(const TString& str)
+ {
+ auto it = Strings_.find(str);
+ if (it != Strings_.end()) {
+ return it->second;
+ }
+
+ auto id = Profile_->string_table_size();
+ Strings_[str] = id;
+ Profile_->add_string_table(str);
+ return id;
+ }
+
+ ui64 SymbolizeObject(const TSymbolIndex::TObject* object)
+ {
+ auto it = Objects_.find(object);
+ if (it != Objects_.end()) {
+ return it->second;
+ }
+
+ auto mapping = Profile_->add_mapping();
+ mapping->set_id(Profile_->mapping_size());
+ Objects_[object] = mapping->id();
+
+ mapping->set_memory_start(reinterpret_cast<ui64>(object->AddressBegin));
+ mapping->set_memory_limit(reinterpret_cast<ui64>(object->AddressEnd));
+ mapping->set_filename(SymbolizeString(object->Name));
+ return mapping->id();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+static int OnVdsoPhdr(struct dl_phdr_info *info, size_t /* size */, void *data)
+{
+#ifndef YT_NO_AUXV
+ auto vdso = (uintptr_t) getauxval(AT_SYSINFO_EHDR);
+#else
+ uintptr_t vdso = 0;
+#endif
+ auto vdsoRange = reinterpret_cast<std::pair<void*, void*>*>(data);
+ if (info->dlpi_addr == vdso) {
+ auto range = GetExecutableRange(info);
+ if (range) {
+ *vdsoRange = *range;
+ }
+ }
+
+ return 0;
+}
+
+std::pair<void*, void*> GetVdsoRange()
+{
+ std::pair<void*, void*> vdsoRange = {0, 0};
+ dl_iterate_phdr(OnVdsoPhdr, &vdsoRange);
+ return vdsoRange;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static int OnBuildIdPhdr(struct dl_phdr_info *info, size_t /*size*/, void *data)
+{
+ auto buildId = reinterpret_cast<std::optional<TString>*>(data);
+
+ TString objectName = info->dlpi_name;
+ if (objectName.empty()) {
+ *buildId = TSymbolIndex::GetBuildId(info);
+ }
+
+ return 0;
+}
+
+std::optional<TString> GetBuildId()
+{
+ std::optional<TString> buildId;
+ dl_iterate_phdr(OnBuildIdPhdr, &buildId);
+ return buildId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void Symbolize(NProto::Profile* profile, bool filesOnly)
+{
+ TSymbolIndexSymbolizer symbolizer(profile);
+ symbolizer.Symbolize(filesOnly);
+ return;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo)
+{
+ auto addComment = [&] (const TString& comment) {
+ auto id = profile->string_table_size();
+ profile->add_string_table(comment);
+ profile->add_comment(id);
+ };
+
+ if (!buildInfo.BinaryVersion.empty()) {
+ addComment("binary_version=" + buildInfo.BinaryVersion);
+ }
+ addComment("arc_revision=" + buildInfo.ArcRevision);
+ addComment("build_type=" + buildInfo.BuildType);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/symbolize.h b/yt/yt/library/ytprof/symbolize.h
new file mode 100644
index 00000000000..6d99b56484a
--- /dev/null
+++ b/yt/yt/library/ytprof/symbolize.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <memory>
+#include <optional>
+
+#include <util/generic/string.h>
+
+#include <yt/yt/library/ytprof/profile.pb.h>
+
+#include "build_info.h"
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void Symbolize(NProto::Profile* profile, bool filesOnly = false);
+
+void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo);
+
+std::pair<void*, void*> GetVdsoRange();
+
+// Returns current binary build id as binary string.
+std::optional<TString> GetBuildId();
+
+// Returns version of profiler library.
+TString GetVersion();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/symbolize_other.cpp b/yt/yt/library/ytprof/symbolize_other.cpp
new file mode 100644
index 00000000000..49f216fd977
--- /dev/null
+++ b/yt/yt/library/ytprof/symbolize_other.cpp
@@ -0,0 +1,35 @@
+#include "symbolize.h"
+#include "util/system/compiler.h"
+
+namespace NYT::NYTProf {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void Symbolize(NProto::Profile* profile, bool filesOnly)
+{
+ Y_UNUSED(profile, filesOnly);
+}
+
+std::pair<void*, void*> GetVdsoRange()
+{
+ return {nullptr, nullptr};
+}
+
+TString GetVersion()
+{
+ return "0.2";
+}
+
+void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo)
+{
+ Y_UNUSED(profile, buildInfo);
+}
+
+std::optional<TString> GetBuildId()
+{
+ return {};
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTProf
diff --git a/yt/yt/library/ytprof/ya.make b/yt/yt/library/ytprof/ya.make
new file mode 100644
index 00000000000..79b92eb7153
--- /dev/null
+++ b/yt/yt/library/ytprof/ya.make
@@ -0,0 +1,74 @@
+LIBRARY()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ profile.proto
+ signal_safe_profiler.cpp
+ cpu_profiler.cpp
+ heap_profiler.cpp
+ spinlock_profiler.cpp
+ profile.cpp
+ build_info.cpp
+ external_pprof.cpp
+)
+
+IF (OS_LINUX)
+ SRCS(symbolize.cpp)
+ELSE()
+ SRCS(symbolize_other.cpp)
+ENDIF()
+
+PEERDIR(
+ library/cpp/yt/memory
+ library/cpp/yt/threading
+ library/cpp/yt/backtrace/cursors/interop
+ library/cpp/yt/backtrace/cursors/frame_pointer
+ library/cpp/yt/backtrace/cursors/libunwind
+ yt/yt/library/ytprof/api
+ contrib/libs/libunwind
+ contrib/libs/tcmalloc/malloc_extension
+ library/cpp/svnversion
+ yt/yt/core
+)
+
+IF (NOT OPENSOURCE)
+ PEERDIR(
+ yt/yt/library/ytprof/bundle
+ )
+ENDIF()
+
+IF (OS_SDK == "local")
+ CXXFLAGS(-DYT_NO_AUXV)
+ENDIF()
+
+IF (BUILD_TYPE == "PROFILE")
+ CXXFLAGS(-DYTPROF_PROFILE_BUILD)
+ENDIF()
+
+CXXFLAGS(-DYTPROF_BUILD_TYPE='\"${BUILD_TYPE}\"')
+
+END()
+
+RECURSE(
+ http
+ example
+ bundle
+)
+
+IF (OS_LINUX)
+ RECURSE(
+ integration
+ )
+
+ IF (NOT OPENSOURCE)
+ RECURSE(
+ benchmark
+ )
+ ENDIF()
+
+ IF (NOT SANITIZER_TYPE AND NOT YT_TEAMCITY)
+ RECURSE(unittests)
+ ENDIF()
+ENDIF()
+