diff options
author | ni-stoiko <ni-stoiko@yandex-team.com> | 2023-08-24 21:33:16 +0300 |
---|---|---|
committer | ni-stoiko <ni-stoiko@yandex-team.com> | 2023-08-24 22:18:11 +0300 |
commit | f5cce2b33840e0f8eaf4b36e78bcf0bb6aaffb34 (patch) | |
tree | dc0dd0491c5ed4356437966a480b9f4cd7d72e10 | |
parent | b1210bd96e66a5e7c66cc3c602e3062157170222 (diff) | |
download | ydb-f5cce2b33840e0f8eaf4b36e78bcf0bb6aaffb34.tar.gz |
YT-19720: Using TraceContext AllocationTags in RPC
Attempt to use AllocationTag in RPC
42 files changed, 3462 insertions, 225 deletions
diff --git a/yt/yt/core/rpc/client-inl.h b/yt/yt/core/rpc/client-inl.h index f90de1f7171..d2f83cbb11e 100644 --- a/yt/yt/core/rpc/client-inl.h +++ b/yt/yt/core/rpc/client-inl.h @@ -54,11 +54,13 @@ TFuture<typename TResponse::TResult> TTypedClientRequest<TRequestMessage, TRespo auto context = CreateClientContext(); auto requestAttachmentsStream = context->GetRequestAttachmentsStream(); auto responseAttachmentsStream = context->GetResponseAttachmentsStream(); + typename TResponse::TResult response; { - TMemoryTagGuard guard(context->GetResponseMemoryTag()); + auto traceContextGuard = NTracing::TCurrentTraceContextGuard(context->GetTraceContext()); response = New<TResponse>(std::move(context)); } + auto promise = response->GetPromise(); auto requestControl = Send(std::move(response)); if (requestControl) { @@ -126,7 +128,7 @@ void TTypedClientResponse<TResponseMessage>::SetPromise(const TError& error) template <class TResponseMessage> bool TTypedClientResponse<TResponseMessage>::TryDeserializeBody(TRef data, std::optional<NCompression::ECodec> codecId) { - TMemoryTagGuard guard(ClientContext_->GetResponseMemoryTag()); + auto traceContextGuard = NTracing::TCurrentTraceContextGuard(ClientContext_->GetTraceContext()); return codecId ? TryDeserializeProtoWithCompression(this, data, *codecId) @@ -149,7 +151,6 @@ TIntrusivePtr<T> TProxyBase::CreateRequest(const TMethodDescriptor& methodDescri request->SetResponseCodec(DefaultResponseCodec_); request->SetEnableLegacyRpcCodecs(DefaultEnableLegacyRpcCodecs_); request->SetMultiplexingBand(methodDescriptor.MultiplexingBand); - request->SetResponseMemoryTag(GetCurrentMemoryTag()); if (methodDescriptor.StreamingEnabled) { request->ClientAttachmentsStreamingParameters() = diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp index 89621fb68e2..fb20e7a2f3d 100644 --- a/yt/yt/core/rpc/client.cpp +++ b/yt/yt/core/rpc/client.cpp @@ -40,8 +40,7 @@ TClientContext::TClientContext( TFeatureIdFormatter featureIdFormatter, bool responseIsHeavy, TAttachmentsOutputStreamPtr requestAttachmentsStream, - TAttachmentsInputStreamPtr responseAttachmentsStream, - TMemoryTag responseMemoryTag) + TAttachmentsInputStreamPtr responseAttachmentsStream) : RequestId_(requestId) , TraceContext_(std::move(traceContext)) , Service_(std::move(service)) @@ -50,7 +49,6 @@ TClientContext::TClientContext( , ResponseHeavy_(responseIsHeavy) , RequestAttachmentsStream_(std::move(requestAttachmentsStream)) , ResponseAttachmentsStream_(std::move(responseAttachmentsStream)) - , ResponseMemoryTag_(responseMemoryTag) { } //////////////////////////////////////////////////////////////////////////////// @@ -349,8 +347,7 @@ TClientContextPtr TClientRequest::CreateClientContext() FeatureIdFormatter_, ResponseHeavy_, RequestAttachmentsStream_, - ResponseAttachmentsStream_, - GetResponseMemoryTag().value_or(GetCurrentMemoryTag())); + ResponseAttachmentsStream_); } void TClientRequest::OnPullRequestAttachmentsStream() diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h index dca8836b2bc..25479128d69 100644 --- a/yt/yt/core/rpc/client.h +++ b/yt/yt/core/rpc/client.h @@ -104,7 +104,6 @@ public: DEFINE_BYVAL_RO_PROPERTY(bool, ResponseHeavy); DEFINE_BYVAL_RO_PROPERTY(TAttachmentsOutputStreamPtr, RequestAttachmentsStream); DEFINE_BYVAL_RO_PROPERTY(TAttachmentsInputStreamPtr, ResponseAttachmentsStream); - DEFINE_BYVAL_RO_PROPERTY(TMemoryTag, ResponseMemoryTag); public: TClientContext( @@ -115,8 +114,7 @@ public: TFeatureIdFormatter featureIdFormatter, bool heavy, TAttachmentsOutputStreamPtr requestAttachmentsStream, - TAttachmentsInputStreamPtr responseAttachmentsStream, - TMemoryTag responseMemoryTag); + TAttachmentsInputStreamPtr responseAttachmentsStream); }; DEFINE_REFCOUNTED_TYPE(TClientContext) @@ -139,7 +137,6 @@ public: DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, ResponseCodec, NCompression::ECodec::None); DEFINE_BYVAL_RW_PROPERTY(bool, EnableLegacyRpcCodecs, true); DEFINE_BYVAL_RW_PROPERTY(bool, GenerateAttachmentChecksums, true); - DEFINE_BYVAL_RW_PROPERTY(std::optional<TMemoryTag>, ResponseMemoryTag); DEFINE_BYVAL_RW_PROPERTY(IMemoryReferenceTrackerPtr, MemoryReferenceTracker); // For testing purposes only. DEFINE_BYVAL_RW_PROPERTY(std::optional<TDuration>, SendDelay); diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp index 1384439803f..a881ac49a4e 100644 --- a/yt/yt/core/rpc/helpers.cpp +++ b/yt/yt/core/rpc/helpers.cpp @@ -425,14 +425,23 @@ TTraceContextPtr GetOrCreateHandlerTraceContext( TTraceContextPtr CreateCallTraceContext(std::string service, std::string method) { - auto context = TryGetCurrentTraceContext(); - if (!context) { + auto oldTraceContext = TryGetCurrentTraceContext(); + if (!oldTraceContext) { return nullptr; } - if (!context->IsRecorded()) { - return context; + if (!oldTraceContext->IsRecorded()) { + return oldTraceContext; } - return context->CreateChild(ConcatToString(TStringBuf("RpcClient:"), service, TStringBuf("."), method)); + + auto traceContext = oldTraceContext->CreateChild(Format("RpcClient:%v.%v", service, method)); + + traceContext->SetAllocationTagsPtr(oldTraceContext->GetAllocationTagsPtr()); + + if (GetCurrentMemoryTag() && !traceContext->FindAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral)) { + traceContext->SetAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral, GetCurrentMemoryTag()); + } + + return traceContext; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/allocation_tags/ya.make b/yt/yt/core/rpc/unittests/allocation_tags/ya.make new file mode 100644 index 00000000000..722b2a98721 --- /dev/null +++ b/yt/yt/core/rpc/unittests/allocation_tags/ya.make @@ -0,0 +1,29 @@ +GTEST(core-rpc-allocation-tags) + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +IF (OS_LINUX) + ALLOCATOR(TCMALLOC_256K) +ENDIF() + +PROTO_NAMESPACE(yt) + +SRCS( + yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp +) + +INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc) + +PEERDIR( + yt/yt/library/ytprof + yt/yt/library/profiling + yt/yt/core + yt/yt/core/rpc/grpc + yt/yt/core/rpc/unittests/lib + yt/yt/core/test_framework + library/cpp/testing/common +) + +SIZE(MEDIUM) + +END() diff --git a/yt/yt/core/rpc/unittests/bin/main.cpp b/yt/yt/core/rpc/unittests/bin/main.cpp index 1e7103c0409..2b3e4f2c9e7 100644 --- a/yt/yt/core/rpc/unittests/bin/main.cpp +++ b/yt/yt/core/rpc/unittests/bin/main.cpp @@ -6,7 +6,7 @@ #include <yt/yt/core/concurrency/thread_pool.h> -#include <yt/yt/core/rpc/unittests/lib/my_service.h> +#include <yt/yt/core/rpc/unittests/lib/test_service.h> using namespace NYT; using namespace NYT::NBus; @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) auto server = CreateBusServer(busServer); auto workerPool = CreateThreadPool(4, "Worker"); - auto service = CreateMyService(workerPool->GetInvoker(), false, /*createChannel*/ {}); + auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {}); server->RegisterService(service); server->Start(); diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp index afc0d7fbb90..a033fa9f198 100644 --- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp +++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp @@ -60,7 +60,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) { auto channel = this->CreateChannel(outerServer.GetAddress()); - TMyProxy proxy(channel); + TTestProxy proxy(channel); auto req = proxy.GetChannelFailureError(); auto error = req->Invoke().Get(); ASSERT_FALSE(error.IsOK()); @@ -79,7 +79,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) ASSERT_FALSE(IsChannelFailureErrorHandled(error)); })); - TMyProxy proxy(channel); + TTestProxy proxy(channel); auto req = proxy.GetChannelFailureError(); auto error = req->Invoke().Get(); ASSERT_FALSE(error.IsOK()); @@ -99,7 +99,7 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) ASSERT_TRUE(IsChannelFailureErrorHandled(error)); })); - TMyProxy proxy(channel); + TTestProxy proxy(channel); auto req = proxy.GetChannelFailureError(); req->set_redirection_address(innerServer.GetAddress()); auto error = req->Invoke().Get(); diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h index e785c71c870..1e00b584c6a 100644 --- a/yt/yt/core/rpc/unittests/lib/common.h +++ b/yt/yt/core/rpc/unittests/lib/common.h @@ -31,7 +31,7 @@ #include <yt/yt/core/rpc/service_detail.h> #include <yt/yt/core/rpc/stream.h> -#include <yt/yt/core/rpc/unittests/lib/my_service.h> +#include <yt/yt/core/rpc/unittests/lib/test_service.h> #include <yt/yt/core/rpc/unittests/lib/no_baggage_service.h> #include <yt/yt/core/rpc/grpc/config.h> @@ -74,10 +74,9 @@ public: TTestCreateChannelCallback createChannel) { Server_ = server; - MyService_ = CreateMyService(invoker, secure, createChannel); + TestService_ = CreateTestService(invoker, secure, createChannel); NoBaggageService_ = CreateNoBaggageService(invoker); - - Server_->RegisterService(MyService_); + Server_->RegisterService(TestService_); Server_->RegisterService(NoBaggageService_); Server_->Start(); } @@ -102,7 +101,7 @@ protected: NTesting::TPortHolder Port_; TString Address_; - IMyServicePtr MyService_; + ITestServicePtr TestService_; IServicePtr NoBaggageService_; IServerPtr Server_; }; diff --git a/yt/yt/core/rpc/unittests/lib/my_service.h b/yt/yt/core/rpc/unittests/lib/my_service.h deleted file mode 100644 index 69f3fa9c7b3..00000000000 --- a/yt/yt/core/rpc/unittests/lib/my_service.h +++ /dev/null @@ -1,77 +0,0 @@ -#pragma once - -#include <yt/yt/core/rpc/client.h> -#include <yt/yt/core/rpc/service.h> - -#include <yt/yt/core/rpc/unittests/lib/my_service.pb.h> - -namespace NYT::NRpc { - -//////////////////////////////////////////////////////////////////////////////// - -DEFINE_ENUM(EMyFeature, - ((Cool) (0)) - ((Great) (1)) -); - -class TMyProxy - : public TProxyBase -{ -public: - DEFINE_RPC_PROXY(TMyProxy, MyService, - .SetProtocolVersion(1) - .SetFeaturesType<EMyFeature>()); - - DEFINE_RPC_PROXY_METHOD(NMyRpc, SomeCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, PassCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, RegularAttachments); - DEFINE_RPC_PROXY_METHOD(NMyRpc, NullAndEmptyAttachments); - DEFINE_RPC_PROXY_METHOD(NMyRpc, Compression); - DEFINE_RPC_PROXY_METHOD(NMyRpc, DoNothing); - DEFINE_RPC_PROXY_METHOD(NMyRpc, CustomMessageError); - DEFINE_RPC_PROXY_METHOD(NMyRpc, NotRegistered); - DEFINE_RPC_PROXY_METHOD(NMyRpc, SlowCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, SlowCanceledCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, NoReply); - DEFINE_RPC_PROXY_METHOD(NMyRpc, FlakyCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, RequireCoolFeature); - DEFINE_RPC_PROXY_METHOD(NMyRpc, RequestBytesThrottledCall); - DEFINE_RPC_PROXY_METHOD(NMyRpc, StreamingEcho, - .SetStreamingEnabled(true)); - DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerStreamsAborted, - .SetStreamingEnabled(true)); - DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerNotReading, - .SetStreamingEnabled(true)); - DEFINE_RPC_PROXY_METHOD(NMyRpc, ServerNotWriting, - .SetStreamingEnabled(true)); - DEFINE_RPC_PROXY_METHOD(NMyRpc, GetTraceBaggage); - DEFINE_RPC_PROXY_METHOD(NMyRpc, CustomMetadata); - DEFINE_RPC_PROXY_METHOD(NMyRpc, GetChannelFailureError); -}; - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_CLASS(IMyService) - -class IMyService - : public virtual IService -{ -public: - virtual TFuture<void> GetSlowCallCanceled() const = 0; - virtual TFuture<void> GetServerStreamsAborted() const = 0; -}; - -DEFINE_REFCOUNTED_TYPE(IMyService) - -//////////////////////////////////////////////////////////////////////////////// - -using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address)>; - -IMyServicePtr CreateMyService( - IInvokerPtr invoker, - bool secure, - TTestCreateChannelCallback createChannel); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/unittests/lib/my_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index 0503cfc5225..fdc5354ca6f 100644 --- a/yt/yt/core/rpc/unittests/lib/my_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -1,4 +1,4 @@ -#include "my_service.h" +#include "test_service.h" #include <gtest/gtest.h> @@ -20,24 +20,25 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// -class TMyService - : public IMyService +class TTestService + : public ITestService , public TServiceBase { public: - TMyService( + TTestService( IInvokerPtr invoker, bool secure, TTestCreateChannelCallback createChannel) : TServiceBase( invoker, - TMyProxy::GetDescriptor(), + TTestProxy::GetDescriptor(), NLogging::TLogger("Main")) , Secure_(secure) , CreateChannel_(createChannel) { RegisterMethod(RPC_SERVICE_METHOD_DESC(SomeCall)); RegisterMethod(RPC_SERVICE_METHOD_DESC(PassCall)); + RegisterMethod(RPC_SERVICE_METHOD_DESC(AllocationCall)); RegisterMethod(RPC_SERVICE_METHOD_DESC(RegularAttachments)); RegisterMethod(RPC_SERVICE_METHOD_DESC(NullAndEmptyAttachments)); RegisterMethod(RPC_SERVICE_METHOD_DESC(Compression)); @@ -70,10 +71,10 @@ public: RegisterMethod(RPC_SERVICE_METHOD_DESC(GetChannelFailureError)); // NB: NotRegisteredCall is not registered intentionally - DeclareServerFeature(EMyFeature::Great); + DeclareServerFeature(ETestFeature::Great); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, SomeCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, SomeCall) { context->SetRequestInfo(); int a = request->a(); @@ -81,7 +82,7 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, PassCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, PassCall) { context->SetRequestInfo(); WriteAuthenticationIdentityToProto(response, context->GetAuthenticationIdentity()); @@ -90,7 +91,14 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, RegularAttachments) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, AllocationCall) + { + context->SetRequestInfo(); + response->set_allocated_string(TString("r", request->size())); + context->Reply(); + } + + DECLARE_RPC_SERVICE_METHOD(NTestRpc, RegularAttachments) { for (const auto& attachment : request->Attachments()) { auto data = TBlob(); @@ -101,7 +109,7 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, NullAndEmptyAttachments) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, NullAndEmptyAttachments) { const auto& attachments = request->Attachments(); EXPECT_EQ(2u, attachments.size()); @@ -112,7 +120,7 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, Compression) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, Compression) { auto requestCodecId = CheckedEnumCast<NCompression::ECodec>(request->request_codec()); auto serializedRequestBody = SerializeProtoToRefWithCompression(*request, requestCodecId); @@ -133,26 +141,26 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, DoNothing) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, DoNothing) { context->SetRequestInfo(); context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, CustomMessageError) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, CustomMessageError) { context->SetRequestInfo(); context->Reply(TError(NYT::EErrorCode(42), "Some Error")); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, SlowCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCall) { context->SetRequestInfo(); TDelayedExecutor::WaitForDuration(TDuration::Seconds(1)); context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, SlowCanceledCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCanceledCall) { try { context->SetRequestInfo(); @@ -169,15 +177,15 @@ public: return SlowCallCanceled_.ToFuture(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, RequestBytesThrottledCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequestBytesThrottledCall) { context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, NoReply) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, NoReply) { } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, StreamingEcho) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, StreamingEcho) { context->SetRequestInfo(); @@ -215,7 +223,7 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerStreamsAborted) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerStreamsAborted) { context->SetRequestInfo(); @@ -244,7 +252,7 @@ public: ServerStreamsAborted_.Set(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerNotReading) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerNotReading) { context->SetRequestInfo(); @@ -266,7 +274,7 @@ public: } } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, ServerNotWriting) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, ServerNotWriting) { context->SetRequestInfo(); @@ -289,7 +297,7 @@ public: } } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, FlakyCall) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, FlakyCall) { static std::atomic<int> callCount; @@ -302,14 +310,14 @@ public: } } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, RequireCoolFeature) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequireCoolFeature) { context->SetRequestInfo(); - context->ValidateClientFeature(EMyFeature::Cool); + context->ValidateClientFeature(ETestFeature::Cool); context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, GetTraceBaggage) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, GetTraceBaggage) { context->SetRequestInfo(); auto* traceContext = NTracing::TryGetCurrentTraceContext(); @@ -317,7 +325,7 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, CustomMetadata) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, CustomMetadata) { auto customMetadataExt = context->GetRequestHeader().GetExtension(NRpc::NProto::TCustomMetadataExt::custom_metadata_ext); for (const auto& [key, value] : customMetadataExt.entries()) { @@ -326,13 +334,13 @@ public: context->Reply(); } - DECLARE_RPC_SERVICE_METHOD(NMyRpc, GetChannelFailureError) + DECLARE_RPC_SERVICE_METHOD(NTestRpc, GetChannelFailureError) { context->SetRequestInfo(); if (request->has_redirection_address()) { YT_VERIFY(CreateChannel_); - TMyProxy proxy(CreateChannel_(request->redirection_address())); + TTestProxy proxy(CreateChannel_(request->redirection_address())); auto req = proxy.GetChannelFailureError(); context->ReplyFrom(req->Invoke().AsVoid()); } else { @@ -367,12 +375,12 @@ private: //////////////////////////////////////////////////////////////////////////////// -IMyServicePtr CreateMyService( +ITestServicePtr CreateTestService( IInvokerPtr invoker, bool secure, TTestCreateChannelCallback createChannel) { - return New<TMyService>(invoker, secure, createChannel); + return New<TTestService>(invoker, secure, createChannel); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h new file mode 100644 index 00000000000..f03be1a1153 --- /dev/null +++ b/yt/yt/core/rpc/unittests/lib/test_service.h @@ -0,0 +1,78 @@ +#pragma once + +#include <yt/yt/core/rpc/client.h> +#include <yt/yt/core/rpc/service.h> + +#include <yt/yt/core/rpc/unittests/lib/test_service.pb.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(ETestFeature, + ((Cool) (0)) + ((Great) (1)) +); + +class TTestProxy + : public TProxyBase +{ +public: + DEFINE_RPC_PROXY(TTestProxy, TestService, + .SetProtocolVersion(1) + .SetFeaturesType<ETestFeature>()); + + DEFINE_RPC_PROXY_METHOD(NTestRpc, SomeCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, PassCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, AllocationCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, RegularAttachments); + DEFINE_RPC_PROXY_METHOD(NTestRpc, NullAndEmptyAttachments); + DEFINE_RPC_PROXY_METHOD(NTestRpc, Compression); + DEFINE_RPC_PROXY_METHOD(NTestRpc, DoNothing); + DEFINE_RPC_PROXY_METHOD(NTestRpc, CustomMessageError); + DEFINE_RPC_PROXY_METHOD(NTestRpc, NotRegistered); + DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCanceledCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, NoReply); + DEFINE_RPC_PROXY_METHOD(NTestRpc, FlakyCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, RequireCoolFeature); + DEFINE_RPC_PROXY_METHOD(NTestRpc, RequestBytesThrottledCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, StreamingEcho, + .SetStreamingEnabled(true)); + DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerStreamsAborted, + .SetStreamingEnabled(true)); + DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerNotReading, + .SetStreamingEnabled(true)); + DEFINE_RPC_PROXY_METHOD(NTestRpc, ServerNotWriting, + .SetStreamingEnabled(true)); + DEFINE_RPC_PROXY_METHOD(NTestRpc, GetTraceBaggage); + DEFINE_RPC_PROXY_METHOD(NTestRpc, CustomMetadata); + DEFINE_RPC_PROXY_METHOD(NTestRpc, GetChannelFailureError); +}; + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(ITestService) + +class ITestService + : public virtual IService +{ +public: + virtual TFuture<void> GetSlowCallCanceled() const = 0; + virtual TFuture<void> GetServerStreamsAborted() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(ITestService) + +//////////////////////////////////////////////////////////////////////////////// + +using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address)>; + +ITestServicePtr CreateTestService( + IInvokerPtr invoker, + bool secure, + TTestCreateChannelCallback createChannel); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/unittests/lib/my_service.proto b/yt/yt/core/rpc/unittests/lib/test_service.proto index 5997d917d22..ebb62f4eb55 100644 --- a/yt/yt/core/rpc/unittests/lib/my_service.proto +++ b/yt/yt/core/rpc/unittests/lib/test_service.proto @@ -1,8 +1,8 @@ -package NMyRpc; +package NTestRpc; import "yt_proto/yt/core/misc/proto/guid.proto"; -option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/rpc/unittests;myservice"; +option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/rpc/unittests;testservice"; //////////////////////////////////////////////////////////////////////////////// @@ -33,6 +33,18 @@ message TRspPassCall //////////////////////////////////////////////////////////////////////////////// +message TReqAllocationCall +{ + required int64 size = 1; +} + +message TRspAllocationCall +{ + optional bytes allocated_string = 1; +} + +//////////////////////////////////////////////////////////////////////////////// + message TReqRegularAttachments { } diff --git a/yt/yt/core/rpc/unittests/lib/ya.make b/yt/yt/core/rpc/unittests/lib/ya.make index a2186cf413c..68e2961b799 100644 --- a/yt/yt/core/rpc/unittests/lib/ya.make +++ b/yt/yt/core/rpc/unittests/lib/ya.make @@ -4,8 +4,8 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( common.cpp - my_service.cpp - my_service.proto + test_service.cpp + test_service.proto no_baggage_service.cpp no_baggage_service.proto ) diff --git a/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp b/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp index ffe6a075772..5c1bb6024c6 100644 --- a/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp +++ b/yt/yt/core/rpc/unittests/roaming_channel_ut.cpp @@ -147,7 +147,7 @@ TYPED_TEST(TRpcTest, RoamingChannelNever) { auto channel = CreateRoamingChannel(New<TOneChannelProvider>(CreateRoamingChannel(New<TNeverProvider>()))); - TMyProxy proxy(std::move(channel)); + TTestProxy proxy(std::move(channel)); auto req = proxy.SomeCall(); req->set_a(42); @@ -167,7 +167,7 @@ TYPED_TEST(TRpcTest, RoamingChannelManual) auto manualProviderWeak = MakeWeak(manualProvider); manualProvider.Reset(); - TMyProxy proxy(std::move(channel)); + TTestProxy proxy(std::move(channel)); auto req = proxy.SomeCall(); req->set_a(42); auto asyncRspOrError = req->Invoke() diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp new file mode 100644 index 00000000000..b78d4aa79e2 --- /dev/null +++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp @@ -0,0 +1,86 @@ +#include <yt/yt/core/rpc/unittests/lib/common.h> + +#include <yt/yt/library/ytprof/heap_profiler.h> + +#if defined(_linux_) +#include <tcmalloc/common.h> +#endif + +namespace NYT::NRpc { +namespace { + +using namespace NTracing; +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +#if !defined(_asan_enabled_) && !defined(_msan_enabled_) && defined(_linux_) + +template <class TImpl> +using TRpcTest = TTestBase<TImpl>; +TYPED_TEST_SUITE(TRpcTest, TAllTransports); + +TYPED_TEST(TRpcTest, ResponseWithAllocationTags) +{ + static TMemoryTag testMemoryTag = 1 << 20; + testMemoryTag++; + + NYTProf::EnableMemoryProfilingTags(); + + auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag); + + auto actionQueue = New<TActionQueue>(); + + using TRspPtr = typename TTestProxy::TRspAllocationCallPtr; + std::vector<TFuture<TRspPtr>> responses; + + TTestProxy proxy(this->CreateChannel()); + + constexpr auto size = 1_MB; + for (int i = 0; i < 10; ++i) { + auto context = CreateTraceContextFromCurrent("ResponseWithAllocationTags"); + auto contextGuard = TTraceContextGuard(context); + context->SetAllocationTag(MemoryTagLiteral, testMemoryTag); + + auto req1 = proxy.AllocationCall(); + req1->set_size(size); + + auto rspFutureNoProp = req1->Invoke() + .Apply(BIND_NO_PROPAGATE([] (const TRspPtr& res) { + EXPECT_EQ(TryGetCurrentTraceContext(), nullptr); + return res; + }).AsyncVia(actionQueue->GetInvoker())); + responses.push_back(rspFutureNoProp); + + auto req2 = proxy.AllocationCall(); + req2->set_size(size); + + auto rspFutureProp = req2->Invoke() + .Apply(BIND([testMemoryTag=testMemoryTag] (const TRspPtr& res) { + auto localContext = TryGetCurrentTraceContext(); + EXPECT_NE(localContext, nullptr); + if (localContext) { + EXPECT_EQ(localContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral).value_or(NullMemoryTag), testMemoryTag); + } + return res; + }).AsyncVia(actionQueue->GetInvoker())); + responses.push_back(rspFutureProp); + } + + for (auto& rsp : responses) { + WaitFor(rsp).ValueOrThrow(); + } + + auto memoryUsageAfter = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag]; + auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage; + EXPECT_GE(deltaMemoryUsage, 14_MB) + << "InitialUsage: " << initialMemoryUsage << std::endl + << "After waiting: " << memoryUsageAfter; +} + +#endif + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp b/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp index 2465097b0d1..8691dc4e9a5 100644 --- a/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_shutdown_ut.cpp @@ -19,32 +19,50 @@ Y_TEST_HOOK_BEFORE_RUN(GTEST_YT_RPC_SHUTDOWN) //////////////////////////////////////////////////////////////////////////////// -template <class TMyProxy> +template <class TTestProxy> void TestShutdown(const IChannelPtr& channel) { - TMyProxy proxy(channel); + TTestProxy proxy(channel); - std::vector<NYT::TFuture<typename TTypedClientResponse<NMyRpc::TRspSomeCall>::TResult>> futures; - futures.reserve(100000); - for (int i = 0; i < 100000; ++i) { + constexpr int numberOfRequests = 1000; + std::vector<TFuture<typename TTypedClientResponse<NTestRpc::TRspSomeCall>::TResult>> futures; + futures.reserve(numberOfRequests); + + for (int i = 0; i < numberOfRequests; ++i) { auto req = proxy.SomeCall(); req->SetTimeout(TDuration::Seconds(1)); req->set_a(42); futures.push_back(req->Invoke()); } - NYT::Shutdown(); + Shutdown(TShutdownOptions{TDuration::Seconds(30), true, 127}); + + if (!NYT::IsShutdownStarted()) { + Cerr << "Shutdown was not started" << Endl; + _exit(2); + } + + TFileInput shutdownLogInput((GetOutputPath() / "shutdown.log").GetPath()); + TString buffer; + int exitCode = 1; + + while (shutdownLogInput.ReadLine(buffer)) { + Cerr << buffer << Endl; + if (exitCode && buffer == "*** Shutdown completed") { + exitCode = 0; + } + } - for (auto& future : futures) { - future.Cancel(TError{}); + if (exitCode) { + Cerr << "Shutdown was NOT completed" << Endl; } - _exit(0); + _exit(exitCode); } TYPED_TEST(TRpcShutdownTest, Shutdown) { - EXPECT_EXIT(TestShutdown<TMyProxy>(this->CreateChannel()), testing::ExitedWithCode(0), ""); + EXPECT_EXIT(TestShutdown<TTestProxy>(this->CreateChannel()), ::testing::ExitedWithCode(0), ""); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index f7ff93048a1..63148dadc91 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -18,17 +18,17 @@ class TNonExistingServiceProxy public: DEFINE_RPC_PROXY(TNonExistingServiceProxy, NonExistingService); - DEFINE_RPC_PROXY_METHOD(NMyRpc, DoNothing); + DEFINE_RPC_PROXY_METHOD(NTestRpc, DoNothing); }; -class TMyIncorrectProtocolVersionProxy +class TTestIncorrectProtocolVersionProxy : public TProxyBase { public: - DEFINE_RPC_PROXY(TMyIncorrectProtocolVersionProxy, MyService, + DEFINE_RPC_PROXY(TTestIncorrectProtocolVersionProxy, TestService, .SetProtocolVersion(2)); - DEFINE_RPC_PROXY_METHOD(NMyRpc, SomeCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, SomeCall); }; //////////////////////////////////////////////////////////////////////////////// @@ -57,7 +57,7 @@ TYPED_TEST_SUITE(TGrpcTest, TGrpcOnly); TYPED_TEST(TRpcTest, Send) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SomeCall(); req->set_a(42); auto rspOrError = req->Invoke().Get(); @@ -77,7 +77,7 @@ TYPED_TEST(TRpcTest, RetryingSend) this->CreateChannel()); { - TMyProxy proxy(channel); + TTestProxy proxy(channel); auto req = proxy.FlakyCall(); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError); @@ -92,7 +92,7 @@ TYPED_TEST(TRpcTest, RetryingSend) TYPED_TEST(TRpcTest, UserTag) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.PassCall(); req->SetUser("test-user"); req->SetUserTag("test-user-tag"); @@ -108,7 +108,7 @@ TYPED_TEST(TRpcTest, UserTag) TYPED_TEST(TNotUdsTest, Address) { auto testChannel = [] (IChannelPtr channel) { - TMyProxy proxy(std::move(channel)); + TTestProxy proxy(std::move(channel)); auto req = proxy.SomeCall(); req->set_a(42); auto rspOrError = req->Invoke().Get(); @@ -133,7 +133,7 @@ TYPED_TEST(TNotUdsTest, Address) TYPED_TEST(TNotGrpcTest, SendSimple) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.PassCall(); req->SetUser("test-user"); req->SetMutationId(TGuid::Create()); @@ -149,7 +149,7 @@ TYPED_TEST(TNotGrpcTest, SendSimple) TYPED_TEST(TNotGrpcTest, StreamingEcho) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultRequestCodec(NCompression::ECodec::Lz4); proxy.SetDefaultResponseCodec(NCompression::ECodec::Zstd_1); proxy.SetDefaultEnableLegacyRpcCodecs(false); @@ -223,7 +223,7 @@ TYPED_TEST(TNotGrpcTest, StreamingEcho) TYPED_TEST(TNotGrpcTest, ClientStreamsAborted) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.StreamingEcho(); req->SetTimeout(TDuration::MilliSeconds(100)); @@ -243,20 +243,20 @@ TYPED_TEST(TNotGrpcTest, ClientStreamsAborted) TYPED_TEST(TNotGrpcTest, ServerStreamsAborted) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.ServerStreamsAborted(); req->SetTimeout(TDuration::MilliSeconds(100)); auto rspOrError = WaitFor(req->Invoke()); EXPECT_EQ(NYT::EErrorCode::Timeout, rspOrError.GetCode()); - WaitFor(this->MyService_->GetServerStreamsAborted()) + WaitFor(this->TestService_->GetServerStreamsAborted()) .ThrowOnError(); } TYPED_TEST(TNotGrpcTest, ClientNotReading) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultServerAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(250); for (auto sleep : {false, true}) { @@ -287,7 +287,7 @@ TYPED_TEST(TNotGrpcTest, ClientNotReading) TYPED_TEST(TNotGrpcTest, ClientNotWriting) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(250); for (auto sleep : {false, true}) { @@ -318,7 +318,7 @@ TYPED_TEST(TNotGrpcTest, ClientNotWriting) TYPED_TEST(TNotGrpcTest, ServerNotReading) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultClientAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(250); for (auto sleep : {false, true}) { @@ -339,13 +339,13 @@ TYPED_TEST(TNotGrpcTest, ServerNotReading) EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode()); } - WaitFor(this->MyService_->GetSlowCallCanceled()) + WaitFor(this->TestService_->GetSlowCallCanceled()) .ThrowOnError(); } TYPED_TEST(TNotGrpcTest, ServerNotWriting) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultClientAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(250); for (auto sleep : {false, true}) { @@ -365,13 +365,13 @@ TYPED_TEST(TNotGrpcTest, ServerNotWriting) EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode()); } - WaitFor(this->MyService_->GetSlowCallCanceled()) + WaitFor(this->TestService_->GetSlowCallCanceled()) .ThrowOnError(); } TYPED_TEST(TNotGrpcTest, LaggyStreamingRequest) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500); proxy.DefaultClientAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(500); @@ -394,7 +394,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest) { auto configText = TString(R"({ services = { - MyService = { + TestService = { pending_payloads_timeout = 250; }; }; @@ -402,7 +402,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest) auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText)); this->Server_->Configure(config); - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500); auto start = Now(); @@ -437,7 +437,7 @@ TYPED_TEST(TNotGrpcTest, TraceBaggagePropagation) baggage->Set("key2", "value2"); traceContext->PackBaggage(ConvertToAttributes(baggage)); - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.GetTraceBaggage(); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()); @@ -472,12 +472,12 @@ TYPED_TEST(TRpcTest, ManyAsyncRequests) std::vector<TFuture<void>> asyncResults; - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); for (int i = 0; i < RequestCount; ++i) { auto request = proxy.SomeCall(); request->set_a(i); - auto asyncResult = request->Invoke().Apply(BIND([=] (TMyProxy::TRspSomeCallPtr rsp) { + auto asyncResult = request->Invoke().Apply(BIND([=] (TTestProxy::TRspSomeCallPtr rsp) { EXPECT_EQ(i + 100, rsp->b()); })); asyncResults.push_back(asyncResult); @@ -488,12 +488,12 @@ TYPED_TEST(TRpcTest, ManyAsyncRequests) TYPED_TEST(TRpcTest, RegularAttachments) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.RegularAttachments(); req->Attachments().push_back(TSharedRef::FromString("Hello")); req->Attachments().push_back(TSharedRef::FromString("from")); - req->Attachments().push_back(TSharedRef::FromString("TMyProxy")); + req->Attachments().push_back(TSharedRef::FromString("TTestProxy")); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()); @@ -503,12 +503,12 @@ TYPED_TEST(TRpcTest, RegularAttachments) EXPECT_EQ(3u, attachments.size()); EXPECT_EQ("Hello_", StringFromSharedRef(attachments[0])); EXPECT_EQ("from_", StringFromSharedRef(attachments[1])); - EXPECT_EQ("TMyProxy_", StringFromSharedRef(attachments[2])); + EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2])); } TYPED_TEST(TRpcTest, NullAndEmptyAttachments) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.NullAndEmptyAttachments(); req->Attachments().push_back(TSharedRef()); @@ -537,7 +537,7 @@ TYPED_TEST(TNotGrpcTest, Compression) "According to all known laws of aviation, there is no way that a bee should be able to fly." }); - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultRequestCodec(requestCodecId); proxy.SetDefaultResponseCodec(responseCodecId); proxy.SetDefaultEnableLegacyRpcCodecs(false); @@ -579,16 +579,16 @@ TYPED_TEST(TRpcTest, ResponseMemoryTag) testMemoryTag++; auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag); - std::vector<TMyProxy::TRspPassCallPtr> rsps; + std::vector<TTestProxy::TRspPassCallPtr> rsps; { - TMyProxy proxy(this->CreateChannel()); - TString longString(100, 'a'); + TTestProxy proxy(this->CreateChannel()); + TString userName("user"); TMemoryTagGuard guard(testMemoryTag); - for (int i = 0; i < 10000; ++i) { + for (int i = 0; i < 1000; ++i) { auto req = proxy.PassCall(); - req->SetUser(longString); + req->SetUser(userName); req->SetMutationId(TGuid::Create()); req->SetRetry(false); auto err = req->Invoke().Get(); @@ -597,7 +597,7 @@ TYPED_TEST(TRpcTest, ResponseMemoryTag) } auto currentMemoryUsage = GetMemoryUsageForTag(testMemoryTag); - EXPECT_GE(currentMemoryUsage - initialMemoryUsage, 500'000u) + EXPECT_GE(currentMemoryUsage - initialMemoryUsage, 256_KB) << "InitialUsage: " << initialMemoryUsage << std::endl << "Current: " << currentMemoryUsage; } @@ -608,7 +608,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) { auto configText = TString(R"({ services = { - MyService = { + TestService = { methods = { RequestBytesThrottledCall = { request_bytes_throttler = { @@ -622,7 +622,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText)); this->Server_->Configure(config); - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto makeCall = [&] { auto req = proxy.RequestBytesThrottledCall(); @@ -644,7 +644,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) TYPED_TEST(TRpcTest, OK) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.DoNothing(); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()); @@ -652,7 +652,7 @@ TYPED_TEST(TRpcTest, OK) TYPED_TEST(TRpcTest, NoAck) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.DoNothing(); req->SetAcknowledgementTimeout(std::nullopt); auto rspOrError = req->Invoke().Get(); @@ -661,7 +661,7 @@ TYPED_TEST(TRpcTest, NoAck) TYPED_TEST(TRpcTest, TransportError) { - TMyProxy proxy(this->CreateChannel("localhost:9999")); + TTestProxy proxy(this->CreateChannel("localhost:9999")); auto req = proxy.DoNothing(); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode()); @@ -677,7 +677,7 @@ TYPED_TEST(TRpcTest, NoService) TYPED_TEST(TRpcTest, NoMethod) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.NotRegistered(); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NRpc::EErrorCode::NoSuchMethod, rspOrError.GetCode()); @@ -686,7 +686,7 @@ TYPED_TEST(TRpcTest, NoMethod) // NB: Realms are not supported in RPC over GRPC. TYPED_TEST(TNotGrpcTest, NoSuchRealm) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.DoNothing(); ToProto(req->Header().mutable_realm_id(), TGuid::FromString("1-2-3-4")); auto rspOrError = req->Invoke().Get(); @@ -696,7 +696,7 @@ TYPED_TEST(TNotGrpcTest, NoSuchRealm) TYPED_TEST(TRpcTest, ClientTimeout) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultTimeout(TDuration::Seconds(0.5)); auto req = proxy.SlowCall(); auto rspOrError = req->Invoke().Get(); @@ -705,18 +705,18 @@ TYPED_TEST(TRpcTest, ClientTimeout) TYPED_TEST(TRpcTest, ServerTimeout) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultTimeout(TDuration::Seconds(0.5)); auto req = proxy.SlowCanceledCall(); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(this->CheckTimeoutCode(rspOrError.GetCode())); - WaitFor(this->MyService_->GetSlowCallCanceled()) + WaitFor(this->TestService_->GetSlowCallCanceled()) .ThrowOnError(); } TYPED_TEST(TRpcTest, ClientCancel) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCanceledCall(); auto asyncRspOrError = req->Invoke(); Sleep(TDuration::Seconds(0.5)); @@ -726,13 +726,13 @@ TYPED_TEST(TRpcTest, ClientCancel) EXPECT_TRUE(asyncRspOrError.IsSet()); auto rspOrError = asyncRspOrError.Get(); EXPECT_TRUE(this->CheckCancelCode(rspOrError.GetCode())); - WaitFor(this->MyService_->GetSlowCallCanceled()) + WaitFor(this->TestService_->GetSlowCallCanceled()) .ThrowOnError(); } TYPED_TEST(TRpcTest, SlowCall) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultTimeout(TDuration::Seconds(2.0)); auto req = proxy.SlowCall(); auto rspOrError = req->Invoke().Get(); @@ -741,7 +741,7 @@ TYPED_TEST(TRpcTest, SlowCall) TYPED_TEST(TRpcTest, RequestQueueSizeLimit) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); std::vector<TFuture<void>> futures; for (int i = 0; i < 30; ++i) { auto req = proxy.SlowCall(); @@ -757,7 +757,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) TYPED_TEST(TRpcTest, ConcurrencyLimit) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); std::vector<TFuture<void>> futures; for (int i = 0; i < 10; ++i) { auto req = proxy.SlowCall(); @@ -782,7 +782,7 @@ TYPED_TEST(TRpcTest, ConcurrencyLimit) TYPED_TEST(TRpcTest, NoReply) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.NoReply(); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NRpc::EErrorCode::Unavailable, rspOrError.GetCode()); @@ -790,7 +790,7 @@ TYPED_TEST(TRpcTest, NoReply) TYPED_TEST(TRpcTest, CustomErrorMessage) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.CustomMessageError(); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NYT::EErrorCode(42), rspOrError.GetCode()); @@ -799,7 +799,7 @@ TYPED_TEST(TRpcTest, CustomErrorMessage) TYPED_TEST(TRpcTest, ConnectionLost) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCanceledCall(); auto asyncRspOrError = req->Invoke(); @@ -814,13 +814,13 @@ TYPED_TEST(TRpcTest, ConnectionLost) EXPECT_TRUE(asyncRspOrError.IsSet()); auto rspOrError = asyncRspOrError.Get(); EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode()); - WaitFor(this->MyService_->GetSlowCallCanceled()) + WaitFor(this->TestService_->GetSlowCallCanceled()) .ThrowOnError(); } TYPED_TEST(TNotGrpcTest, ProtocolVersionMismatch) { - TMyIncorrectProtocolVersionProxy proxy(this->CreateChannel()); + TTestIncorrectProtocolVersionProxy proxy(this->CreateChannel()); auto req = proxy.SomeCall(); req->set_a(42); auto rspOrError = req->Invoke().Get(); @@ -829,57 +829,57 @@ TYPED_TEST(TNotGrpcTest, ProtocolVersionMismatch) TYPED_TEST(TNotGrpcTest, RequiredServerFeatureSupported) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.PassCall(); - req->RequireServerFeature(EMyFeature::Great); + req->RequireServerFeature(ETestFeature::Great); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError); } TYPED_TEST(TNotGrpcTest, RequiredServerFeatureNotSupported) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.PassCall(); - req->RequireServerFeature(EMyFeature::Cool); + req->RequireServerFeature(ETestFeature::Cool); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NRpc::EErrorCode::UnsupportedServerFeature, rspOrError.GetCode()); - EXPECT_EQ(static_cast<int>(EMyFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey)); - EXPECT_EQ(ToString(EMyFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey)); + EXPECT_EQ(static_cast<int>(ETestFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey)); + EXPECT_EQ(ToString(ETestFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey)); } TYPED_TEST(TNotGrpcTest, RequiredClientFeatureSupported) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.RequireCoolFeature(); - req->DeclareClientFeature(EMyFeature::Cool); + req->DeclareClientFeature(ETestFeature::Cool); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(rspOrError.IsOK()) << ToString(rspOrError); } TYPED_TEST(TNotGrpcTest, RequiredClientFeatureNotSupported) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.RequireCoolFeature(); - req->DeclareClientFeature(EMyFeature::Great); + req->DeclareClientFeature(ETestFeature::Great); auto rspOrError = req->Invoke().Get(); EXPECT_EQ(NRpc::EErrorCode::UnsupportedClientFeature, rspOrError.GetCode()); - EXPECT_EQ(static_cast<int>(EMyFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey)); - EXPECT_EQ(ToString(EMyFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey)); + EXPECT_EQ(static_cast<int>(ETestFeature::Cool), rspOrError.Attributes().Get<int>(FeatureIdAttributeKey)); + EXPECT_EQ(ToString(ETestFeature::Cool), rspOrError.Attributes().Get<TString>(FeatureNameAttributeKey)); } TYPED_TEST(TRpcTest, StopWithoutActiveRequests) { - auto stopResult = this->MyService_->Stop(); + auto stopResult = this->TestService_->Stop(); EXPECT_TRUE(stopResult.IsSet()); } TYPED_TEST(TRpcTest, StopWithActiveRequests) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCall(); auto reqResult = req->Invoke(); Sleep(TDuration::Seconds(0.5)); - auto stopResult = this->MyService_->Stop(); + auto stopResult = this->TestService_->Stop(); EXPECT_FALSE(stopResult.IsSet()); EXPECT_TRUE(reqResult.Get().IsOK()); Sleep(TDuration::Seconds(0.5)); @@ -888,9 +888,9 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests) TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop) { - auto stopResult = this->MyService_->Stop(); + auto stopResult = this->TestService_->Stop(); EXPECT_TRUE(stopResult.IsSet()); - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCall(); auto reqResult = req->Invoke(); EXPECT_FALSE(reqResult.Get().IsOK()); @@ -898,7 +898,7 @@ TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop) TYPED_TEST(TRpcTest, CustomMetadata) { - TMyProxy proxy(this->CreateChannel()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.CustomMetadata(); NYT::NRpc::NProto::TCustomMetadataExt customMetadataExt; (*customMetadataExt.mutable_entries())["key1"] = "value1"; @@ -915,7 +915,7 @@ TYPED_TEST(TGrpcTest, SendMessageLimit) { THashMap<TString, NYTree::INodePtr> arguments; arguments["grpc.max_send_message_length"] = NYT::NYTree::ConvertToNode(1); - TMyProxy proxy(this->CreateChannel(std::nullopt, std::move(arguments))); + TTestProxy proxy(this->CreateChannel(std::nullopt, std::move(arguments))); auto req = proxy.SomeCall(); req->set_a(42); auto error = req->Invoke().Get(); diff --git a/yt/yt/core/rpc/unittests/ya.make b/yt/yt/core/rpc/unittests/ya.make index cbaa5e58c6d..9107d2628cf 100644 --- a/yt/yt/core/rpc/unittests/ya.make +++ b/yt/yt/core/rpc/unittests/ya.make @@ -7,4 +7,5 @@ RECURSE( RECURSE_FOR_TESTS( main shutdown + allocation_tags ) diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index bb88ff60f80..7aedfb5dfec 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -304,6 +304,16 @@ TAllocationTagsPtr TTraceContext::GetAllocationTagsPtr() const noexcept return AllocationTags_; } +void TTraceContext::SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept +{ + auto writerGuard = WriterGuard(AllocationTagsLock_); + + // Local guard for setting RefCounted AllocationTags_. + auto guard = Guard(AllocationTagsAsRefCountedLock_); + + AllocationTags_ = std::move(allocationTags); +} + void TTraceContext::DoSetAllocationTags(TAllocationTags::TTags&& tags) { VERIFY_SPINLOCK_AFFINITY(AllocationTagsLock_); diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index 0a6d6ae6cba..1c83c50b3b1 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -126,6 +126,8 @@ public: TAllocationTagsPtr GetAllocationTagsPtr() const noexcept; + void SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept; + void ClearAllocationTagsPtr() noexcept; template <typename TTag> diff --git a/yt/yt/library/ytprof/README.md b/yt/yt/library/ytprof/README.md new file mode 100644 index 00000000000..b676b29dfb1 --- /dev/null +++ b/yt/yt/library/ytprof/README.md @@ -0,0 +1,251 @@ +# ytprof + +Библиотека YTProf реализует набор семплирующих профайлеров совместимых с инструментом визуализации [pprof](https://github.com/google/pprof/blob/master/doc/README.md). + +Профайлер линкуется в исполняемый файл и запускает HTTP сервер на дебажном порту. HTTP вызовы отдают результат профилирования в формате pprof (сжатый gzip протобуф из [profiler.proto](https://a.yandex-team.ru/arc/trunk/arcadia/yt/yt/library/ytprof/profile.proto)). + +Файл с профилем является self contained, содержит внутри себя всю информацию о символах и номерах строк и не требует оригинального бинаря для просмотра. + +Для удобного просмотра исходного кода, нужно запускать pprof из корня аркадии с опцией `-trim_path='/-S/:/-B/'`. + +## Quick Start + +Работа с профайлером, на примере [example/main.cpp](https://a.yandex-team.ru/arc/trunk/arcadia/yt/yt/library/ytprof/example/main.cpp) + +(*) Если `ya` запускается через враппер `/usr/local/bin/ya`, то в `ya tool pprof` не работает интерактивный режим. Лучше удалить `/usr/local/bin/ya` со своей машины и использовать другой способ запуска. + +``` +# Все команды выполняются из корня аркадии + +# Собираем пример +ya make --build=profile yt/yt/library/ytprof/example + +# Запускаем пример в соседнем терминале на порту 10003 +./yt/yt/library/ytprof/example/example 10003 + +# Смотрим потребление памяти +ya tool pprof -symbolize=none http://localhost:10003/heap +Fetching profile over HTTP from http://localhost:10003/heap +Saved profile in /home/prime/pprof/pprof.example.allocations.space.002.pb.gz +File: example +generated by ytprof 0.2 +Type: space +Entering interactive mode (type "help" for commands, "o" for options) +(pprof) list main +Total: 64.80MB +ROUTINE ======================== main in /home/prime/arc/yt/yt/library/ytprof/example/main.cpp + 0 62.38MB (flat, cum) 96.25% of Total + . . 30: Register(server, ""); + . . 31: server->Start(); + . . 32: + . . 33: THashMap<TString, std::vector<int>> data; + . . 34: for (int i = 0; i < 1024 * 16; i++) { + . 62.38MB 35: data[ToString(i)].resize(1024); + . . 36: } + . . 37: + . . 38: ui64 value = 0; + . . 39: while (true) { + . . 40: THash<TString> hasher; + +# Смотрим потребление CPU в течении 15 секунд +ya tool pprof -symbolize=none 'http://localhost:10003/profile?d=15' +Fetching profile over HTTP from http://localhost:10003/profile?d=15 +Saved profile in /home/prime/pprof/pprof.example.sample.profile.004.pb.gz +File: example +generated by ytprof 0.2 +arc_revision=8788163 +build_type=profile +Type: cpu +Entering interactive mode (type "help" for commands, "o" for options) +(pprof) top +Showing nodes accounting for 11840ms, 78.83% of 15020ms total +Dropped 45 nodes (cum <= 75.10ms) +Showing top 10 nodes out of 53 + flat flat% sum% cum cum% + 5250ms 34.95% 34.95% 5250ms 34.95% (anonymous namespace)::TBasicIntFormatter::Format + 2610ms 17.38% 52.33% 3760ms 25.03% CityHash64 + 1000ms 6.66% 58.99% 1000ms 6.66% ReadUnaligned (inline) + 980ms 6.52% 65.51% 980ms 6.52% memcpyU256 + 530ms 3.53% 69.04% 530ms 3.53% (anonymous namespace)::TIntFormatter::Format + 360ms 2.40% 71.44% 360ms 2.40% TcmallocSlab_Internal_Pop_trampoline_2 + 330ms 2.20% 73.64% 330ms 2.20% TcmallocSlab_Internal_Push_trampoline_7 + 280ms 1.86% 75.50% 280ms 1.86% std::__y1::basic_string::__is_long + 260ms 1.73% 77.23% 4020ms 26.76% NHashPrivate::ComputeStringHash + 240ms 1.60% 78.83% 240ms 1.60% tcmalloc::tcmalloc_internal::PageMap2::sizeclass +(pprof) list main +Total: 15.02s +ROUTINE ======================== main in /home/prime/arc/yt/yt/library/ytprof/example/main.cpp + 150ms 13.99s (flat, cum) 93.14% of Total + . . 1:#include <yt/yt/core/concurrency/poller.h> + . . 2:#include <yt/yt/core/concurrency/thread_pool_poller.h> + . . 3:#include <yt/yt/core/concurrency/action_queue.h> + . . 4:#include <yt/yt/core/http/server.h> + . . 5: + . . 6:#include <yt/yt/library/ytprof/http/handler.h> + . . 7:#include <yt/yt/library/ytprof/heap_profiler.h> + . . 8: + . . 9:#include <absl/debugging/stacktrace.h> + . . 10: + . . 11:using namespace NYT; + . . 12:using namespace NYT::NHttp; + . . 13:using namespace NYT::NConcurrency; + . . 14:using namespace NYT::NYTProf; + . . 15: + . . 16:int main(int argc, char* argv[]) + . . 17:{ + . . 18: absl::SetStackUnwinder(AbslStackUnwinder); + . . 19: tcmalloc::MallocExtension::SetProfileSamplingRate(2_MB); + . . 20: + . . 21: try { + . . 22: if (argc != 2 && argc != 3) { + . . 23: throw yexception() << "usage: " << argv[0] << " PORT"; + . . 24: } + . . 25: + . . 26: auto port = FromString<int>(argv[1]); + . . 27: auto poller = CreateThreadPoolPoller(1, "Example"); + . . 28: auto server = CreateServer(port, poller); + . . 29: + . . 30: Register(server, ""); + . . 31: server->Start(); + . . 32: + . . 33: THashMap<TString, std::vector<int>> data; + . . 34: for (int i = 0; i < 1024 * 16; i++) { + . . 35: data[ToString(i)].resize(1024); + . . 36: } + . . 37: + . . 38: ui64 value = 0; + . . 39: while (true) { + . . 40: THash<TString> hasher; + . . 41: for (int i = 0; i < 10000000; i++) { + . 13.75s 42: value += hasher(ToString(i)); + . . 43: } + . . 44: + . . 45: std::vector<TString> data; + . . 46: for (int i = 0; i < 10000; i++) { + . 80ms 47: data.push_back(TString(1024, 'x')); + . . 48: } + . . 49: + . . 50: if (value == 1) { + . . 51: Sleep(TDuration::Seconds(1)); + . . 52: } + . 10ms 53: } + . . 54: } catch (const std::exception& ex) { + . . 55: Cerr << ex.what() << Endl; + . . 56: _exit(1); + . . 57: } + . . 58: +``` + +## CPU профайлер + +CPU профайлер реализован на основе сигнала SIGPROF. В выключенном состоянии +профайлер никак не влияет на работу программы. В включенном состоянии ядро +начинает посылать сигналы всем тредам процесса. Обработчик сигнала собирает текущий стек и пушит его в lock free очередь. + +Для работы CPU профайлера нужно чтобы исполняемый файл был собран с опцией `--build=profile`. Добавление этой опции может замедлить исполнение на программы на несколько процентов, поэтому рекомендуется совмещать `--build=profile` с флагом `--thinlto`. Такая конфигурация должна работать на пару процентов быстрее, чем обычный `--build=release`. + +### CPU теги + +YTProf поддерживает теги. Теги позволяют фильтровать профиль в pprof с помощью команды +`tagfocus`. Например, можно положить в тег id эксперимента или имя текущего пользователя. + +`TCpuProfilerTagGuard` кладёт тег в TLS треда. Также есть возможность положить тег в `TTraceContext` запроса, чтобы он автоматически пробрасывался сквозь асинхронный код YT файберов. + +### Тяжелые actions + +Профайлер умеет тегировать семплы временем исполнения в YT тред пуле. Это позволяет находить код, который надолго заблокировал какой-то тред. + +- Можно включить теги, чтобы фильтровать в pprof `/profile?record_action_run_time=1`. +- Можно сразу отфильтровать только семплы из долгих экшенов `/profile?action_min_exec_time=1s` + +``` +# После этого, можно отфильтровать семплы командой tagfocus. +(pprof) tagfocus=action_run_time_us=1000000: +# И посмотреть список семплов. +(pprof) traces +``` + +## Memory Profiler + +Для работы memory профайлера нужна поддержка в аллокаторе. На текущий момент, такая +поддержка есть только в tcmalloc. + +Чтобы включить memory profiler, нужно: + + 1. Слинковаться с одной конфигураций tcmalloc, дописав в ya.make своей программы: + ``` + ALLOCATOR(TCMALLOC) + ``` + 2. На раннем этапе инициализации приложения, до того как началось выделение больших пользовательских объектов + написать код конфигурации профайлера. + ```c++ + #include <yt/yt/library/ytprof/heap_profiler.h> + #include <absl/debugging/stacktrace.h> + + int main() { + absl::SetStackUnwinder(AbslStackUnwinder); + tcmalloc::MallocExtension::SetProfileSamplingRate(2_MB); + + ... + } + ``` + +tcmalloc поддерживает 4 вида профилей: +- `heap` - снепшот текущего потребления памяти. Это основной профиль, который вам интересен. +- `peak` - снепшот потребления памяти в момент, когда приложение потребляло максимальный объем памяти. Профиль удобен, чтобы искать внезапный пик потребления, который уже ушёл. +- `allocations` - профиль выделения памяти за интервал. Полезен для того, чтобы оптимизировать производительность. Код, который выделяет и освобождает вектор в горячем цикле, не будет виден в `heap` профиле потому что в каждый момент времени жив всего один блок памяти. Но будет виден в этом профиле. +- `fragmentation` - профиль фрагментации памяти. Small объекты в tcmalloc +выделяются из спанов. Спан - это массив small объектов одинакового размера. Спан не вернётся назад в общий пул памяти, пока не освободят все объекты в нём. В худшем случае, всю память могут занимать такие спаны с одним живым объектом внутри. Этот тип профиля позволяет находить, какие аллокации приводят к подобной проблеме. + +Memory profiler работает в любом типе сборки, кроме сборки с санитайзерами. + +## Spinlock Profiler + +ytprof поддерживает профилирование 2-х видов спинлоков. +- `lock` - профиль спинлоков из `absl`. Эти спинлоки используются внутри tcmalloc. +- `block` - профиль спинлоков из `library/cpp/yt/threading` + +### Быстрые примеры для YT + +``` +# Текущее потребление ноды +curl http://vla0-8040-co-node-arnold.vla.yp-c.yandex.net:10013/ytprof/heap > ../heap.pb.gz +# Пиковое потребение ноды +curl 'http://vla2-8153-node-seneca-vla.vla.yp-c.yandex.net:10012/ytprof/peak' > ../peak.pb.gz +# Профиль аллокаций ноды +curl 'http://vla2-8153-node-seneca-vla.vla.yp-c.yandex.net:10012/ytprof/allocations?d=60' > ../allocations_node.pb.gz +# CPU профиль мастера +curl 'http://m002-hahn.sas.yp-c.yandex.net:10010/ytprof/profile' > ../profile.pb.gz +# CPU профиль шедулера с повышенной частотой +curl 'http://sas5-9718-scheduler-hahn.sas.yp-c.yandex.net:10011/ytprof/profile?freq=1000' > ../profile_1000.pb.gz + +# Смотрим профиль из корня аркадии. trim_path можно поправить, если исходники не находятся. +ya tool pprof -symbolize=none -trim_path='/-S/:/-B/:/home/teamcity/source/Yt_ArcRelease' ../heap.pb.gz +File: ytserver-node +generated by ytprof 0.3 +binary_version=22.1.9091475-stable-ya~b6081b45d6f7f85a +arc_revision=b6081b45d6f7f85ab6f9772f31c277fe01de886c +build_type=profile +Type: space +Entering interactive mode (type "help" for commands, "o" for options) +# Сохраняет svg файл. В нём проще всего найти интересные функции, чтобы потом смотреть на них более подробно +(pprof) svg +# Показывает топ функций. Удобно в cpu, в памяти скорее всего будет показывать функции аллокации +(pprof) top +# Сортирует не по потреблению в функции, а по потреблению в самой функции и всех её детях +(pprof) sort=cum +# Переключает вес семпла на штуки. По дефолту семплы взвешиваются в секундах или мегабайтах +(pprof) sample_index=0 +# Показывает код функции. Параметр - это регулярное выражение +(pprof) list IOutputStream::Write +# Фильтрует семплы по треду +(pprof) tagfocus=thread=StorageHeavy +# Аллокации по размеру +(pprof) tagfocus=allocated_size=2mb: +# Фильтрует экшены, которые бежали больше 10ms +(pprof) tagfocus=action_run_time_us=10000: +# Убирает фильтр +(pprof) tagfocus= +# Показывает сырые семплы +(pprof) traces +``` diff --git a/yt/yt/library/ytprof/build_info.cpp b/yt/yt/library/ytprof/build_info.cpp new file mode 100644 index 00000000000..97f3652485d --- /dev/null +++ b/yt/yt/library/ytprof/build_info.cpp @@ -0,0 +1,34 @@ +#include "build_info.h" + +#include <library/cpp/svnversion/svnversion.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +TBuildInfo TBuildInfo::GetDefault() +{ + TBuildInfo buildInfo; + + buildInfo.BuildType = YTPROF_BUILD_TYPE; + buildInfo.BuildType.to_lower(); // no shouting + + if (GetVCS() == TString{"arc"}) { + buildInfo.ArcRevision = GetProgramCommitId(); + } + + return buildInfo; +} + +bool IsProfileBuild() +{ +#ifdef YTPROF_PROFILE_BUILD + return true; +#else + return false; +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/build_info.h b/yt/yt/library/ytprof/build_info.h new file mode 100644 index 00000000000..c2e8a8ed1e1 --- /dev/null +++ b/yt/yt/library/ytprof/build_info.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/generic/string.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +struct TBuildInfo +{ + // Semantic version of current binary. + // + // Meaning of this field is application specific. Empty by default. + TString BinaryVersion; + + // ArcRevision this binary was built from. + TString ArcRevision; + bool ArcDirty; + + // BuildType this binary was built with. + TString BuildType; + + static TBuildInfo GetDefault(); +}; + +bool IsProfileBuild(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/cpu_profiler.cpp b/yt/yt/library/ytprof/cpu_profiler.cpp new file mode 100644 index 00000000000..e04aefa4249 --- /dev/null +++ b/yt/yt/library/ytprof/cpu_profiler.cpp @@ -0,0 +1,201 @@ +#include "cpu_profiler.h" +#include "symbolize.h" + +#if defined(_linux_) +#include <link.h> +#include <sys/syscall.h> +#include <sys/prctl.h> + +#include <library/cpp/yt/cpu_clock/clock.h> + +#include <library/cpp/yt/backtrace/cursors/frame_pointer/frame_pointer_cursor.h> + +#include <library/cpp/yt/backtrace/cursors/interop/interop.h> + +#include <util/system/yield.h> + +#include <csignal> +#include <functional> +#endif + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +#if not defined(_linux_) + +TCpuProfiler::TCpuProfiler(TCpuProfilerOptions options) + : TSignalSafeProfiler(options) +{ } + +TCpuProfiler::~TCpuProfiler() +{ } + +void TCpuProfiler::EnableProfiler() +{ } + +void TCpuProfiler::DisableProfiler() +{ } + +void TCpuProfiler::AnnotateProfile(NProto::Profile* /* profile */, const std::function<i64(const TString&)>& /* stringify */) +{ } + +i64 TCpuProfiler::EncodeValue(i64 value) +{ + return value; +} + +TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration) +{ + return {}; +} + +#endif + +//////////////////////////////////////////////////////////////////////////////// + +#if defined(_linux_) + +std::atomic<TCpuProfiler*> TCpuProfiler::ActiveProfiler_; +std::atomic<bool> TCpuProfiler::HandlingSigprof_; + +TCpuProfiler::TCpuProfiler(TCpuProfilerOptions options) + : TSignalSafeProfiler(options) + , Options_(options) + , ProfilePeriod_(1000000 / Options_.SamplingFrequency) +{ } + +TCpuProfiler::~TCpuProfiler() +{ + Stop(); +} + +void TCpuProfiler::SigProfHandler(int /* sig */, siginfo_t* info, void* ucontext) +{ + int savedErrno = errno; + + while (HandlingSigprof_.exchange(true)) { + SchedYield(); + } + + auto profiler = ActiveProfiler_.load(); + if (profiler) { + profiler->OnSigProf(info, reinterpret_cast<ucontext_t*>(ucontext)); + } + + HandlingSigprof_.store(false); + + errno = savedErrno; +} + +void TCpuProfiler::EnableProfiler() +{ + TCpuProfiler* expected = nullptr; + if (!ActiveProfiler_.compare_exchange_strong(expected, this)) { + throw yexception() << "Another instance of CPU profiler is running"; + } + + struct sigaction sig; + sig.sa_flags = SA_SIGINFO | SA_RESTART; + sigemptyset(&sig.sa_mask); + sig.sa_sigaction = &TCpuProfiler::SigProfHandler; + + if (sigaction(SIGPROF, &sig, NULL) != 0) { + throw TSystemError(LastSystemError()); + } + + itimerval period; + period.it_value.tv_sec = 0; + period.it_value.tv_usec = 1000000 / Options_.SamplingFrequency; + period.it_interval = period.it_value; + + if (setitimer(ITIMER_PROF, &period, nullptr) != 0) { + throw TSystemError(LastSystemError()); + } +} + +void TCpuProfiler::DisableProfiler() +{ + itimerval period{}; + if (setitimer(ITIMER_PROF, &period, nullptr) != 0) { + throw TSystemError(LastSystemError()); + } + + struct sigaction sig; + sig.sa_flags = SA_RESTART; + sigemptyset(&sig.sa_mask); + sig.sa_handler = SIG_IGN; + + if (sigaction(SIGPROF, &sig, NULL) != 0) { + throw TSystemError(LastSystemError()); + } + + ActiveProfiler_ = nullptr; + while (HandlingSigprof_ > 0) { + SchedYield(); + } +} + +void TCpuProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) +{ + auto sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("sample")); + sampleType->set_unit(stringify("count")); + + sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("cpu")); + sampleType->set_unit(stringify("microseconds")); + + auto periodType = profile->mutable_period_type(); + periodType->set_type(stringify("cpu")); + periodType->set_unit(stringify("microseconds")); + + profile->set_period(1000000 / Options_.SamplingFrequency); + + if (SignalOverruns_ > 0) { + profile->add_comment(stringify("cpu.signal_overruns=" + std::to_string(SignalOverruns_))); + } +} + +i64 TCpuProfiler::EncodeValue(i64 value) +{ + return value; +} + +void TCpuProfiler::OnSigProf(siginfo_t* info, ucontext_t* ucontext) +{ + SignalOverruns_ += info->si_overrun; + + for (const auto& filter : Options_.SampleFilters) { + if (!filter()) { + // This sample is filtered out. + return; + } + } + + auto cursorContext = NBacktrace::FramePointerCursorContextFromUcontext(*ucontext); + NBacktrace::TFramePointerCursor cursor(&Reader_, cursorContext); + + RecordSample(&cursor, ProfilePeriod_); +} + +TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration minExecTime) +{ + auto minCpuDuration = DurationToCpuDuration(minExecTime); + + return [minCpuDuration] () { + auto fiberStartTime = GetTraceContextTimingCheckpoint(); + if (fiberStartTime == 0) { + return false; + } + + auto delta = GetCpuInstant() - fiberStartTime; + return delta > minCpuDuration; + }; +} + +#endif + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/cpu_profiler.h b/yt/yt/library/ytprof/cpu_profiler.h new file mode 100644 index 00000000000..81519ba368b --- /dev/null +++ b/yt/yt/library/ytprof/cpu_profiler.h @@ -0,0 +1,71 @@ +#pragma once + +#include <thread> +#include <array> +#include <variant> + +#if defined(_linux_) +#include <sys/types.h> +#endif + +#include <yt/yt/library/ytprof/profile.pb.h> +#include <yt/yt/library/ytprof/api/api.h> + +#include <library/cpp/yt/memory/intrusive_ptr.h> + +#include <util/generic/hash.h> +#include <util/datetime/base.h> + +#include "queue.h" +#include "signal_safe_profiler.h" + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +class TCpuProfilerOptions + : public TSignalSafeProfilerOptions +{ +public: + int SamplingFrequency = 100; + + using TSampleFilter = std::function<bool()>; + std::vector<TSampleFilter> SampleFilters; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TCpuProfiler + : public TSignalSafeProfiler +{ +public: + explicit TCpuProfiler(TCpuProfilerOptions options = {}); + ~TCpuProfiler(); + +private: +#if defined(_linux_) + const TCpuProfilerOptions Options_; + const i64 ProfilePeriod_; + + static std::atomic<TCpuProfiler*> ActiveProfiler_; + static std::atomic<bool> HandlingSigprof_; + static void SigProfHandler(int sig, siginfo_t* info, void* ucontext); + + std::atomic<i64> SignalOverruns_{0}; + + void OnSigProf(siginfo_t* info, ucontext_t* ucontext); +#endif + + void EnableProfiler() override; + void DisableProfiler() override; + void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override; + i64 EncodeValue(i64 value) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCpuProfilerOptions::TSampleFilter GetActionMinExecTimeFilter(TDuration minExecTime); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/external_pprof.cpp b/yt/yt/library/ytprof/external_pprof.cpp new file mode 100644 index 00000000000..71419f82f94 --- /dev/null +++ b/yt/yt/library/ytprof/external_pprof.cpp @@ -0,0 +1,56 @@ +#include "external_pprof.h" + +#include "profile.h" + +#include <library/cpp/resource/resource.h> + +#include <util/folder/tempdir.h> +#include <util/system/file.h> +#include <util/stream/file.h> + +#include <util/generic/string.h> + +#include <vector> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +void SymbolizeByExternalPProf(NProto::Profile* profile, const TSymbolizationOptions& options) +{ + TTempDir tmpDir = TTempDir::NewTempDir(options.TmpDir); + if (options.KeepTmpDir) { + tmpDir.DoNotRemove(); + } + + auto pprofBinary = NResource::Find("/ytprof/pprof"); + auto llvmSymbolizerBinary = NResource::Find("/ytprof/llvm-symbolizer"); + + auto writeFile = [&] (const TString& name) { + TFile file{tmpDir.Path() / name, EOpenModeFlag::CreateAlways|EOpenModeFlag::WrOnly|EOpenModeFlag::AX|EOpenModeFlag::ARW}; + auto binary = NResource::Find("/ytprof/" + name); + file.Write(binary.data(), binary.size()); + }; + + writeFile("pprof"); + writeFile("llvm-symbolizer"); + + TFileOutput output(tmpDir.Path() / "in.pb.gz"); + WriteProfile(&output, *profile); + output.Finish(); + + options.RunTool(std::vector<TString>{ + tmpDir.Path() / "pprof", + "-proto", + "-output=" + (tmpDir.Path() / "out.pb.gz").GetPath(), + "-tools=" + tmpDir.Path().GetPath(), + (tmpDir.Path() / "in.pb.gz").GetPath() + }); + + TFileInput input(tmpDir.Path() / "out.pb.gz"); + ReadProfile(&input, profile); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/external_pprof.h b/yt/yt/library/ytprof/external_pprof.h new file mode 100644 index 00000000000..d7855a50b1f --- /dev/null +++ b/yt/yt/library/ytprof/external_pprof.h @@ -0,0 +1,24 @@ +#pragma once + +#include <yt/yt/library/ytprof/profile.pb.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +struct TSymbolizationOptions +{ + TString TmpDir = "/tmp"; + + bool KeepTmpDir = false; + + std::function<void(const std::vector<TString>&)> RunTool; +}; + +void SymbolizeByExternalPProf( + NProto::Profile* profile, + const TSymbolizationOptions& options); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/heap_profiler.cpp b/yt/yt/library/ytprof/heap_profiler.cpp new file mode 100644 index 00000000000..2fecf95473d --- /dev/null +++ b/yt/yt/library/ytprof/heap_profiler.cpp @@ -0,0 +1,261 @@ +#include "heap_profiler.h" + +#include "symbolize.h" + +#include <library/cpp/yt/memory/leaky_singleton.h> + +#include <library/cpp/yt/threading/spin_lock.h> + +#include <library/cpp/yt/backtrace/cursors/libunwind/libunwind_cursor.h> + +#include <util/generic/hash_set.h> +#include <util/string/join.h> + +#include <tcmalloc/malloc_extension.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +using namespace NTracing; + +//////////////////////////////////////////////////////////////////////////////// + +Y_WEAK void* CreateAllocationTagsData() +{ + return nullptr; +} + +Y_WEAK void* CopyAllocationTagsData(void* userData) +{ + return userData; +} + +Y_WEAK void DestroyAllocationTagsData(void* /*userData*/) +{ } + +Y_WEAK const std::vector<std::pair<TString, TString>>& ReadAllocationTagsData(void* /*userData*/) +{ + static const std::vector<std::pair<TString, TString>> emptyTags; + return emptyTags; +} + +Y_WEAK std::optional<TString> FindTagValue( + const std::vector<std::pair<TString, TString>>& tags, + const TString& key) +{ + Y_UNUSED(tags); + Y_UNUSED(key); + return ToString(NullMemoryTag); +} + +Y_WEAK void StartAllocationTagsCleanupThread(TDuration /*cleanupInterval*/) +{ } + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +namespace NYT::NYTProf { + +using namespace NThreading; +using namespace NTracing; + +//////////////////////////////////////////////////////////////////////////////// + +NProto::Profile ConvertAllocationProfile(const tcmalloc::Profile& snapshot) +{ + NProto::Profile profile; + profile.add_string_table(); + + auto addString = [&] (TString str) { + auto index = profile.string_table_size(); + profile.add_string_table(str); + return index; + }; + + auto sampleType = profile.add_sample_type(); + sampleType->set_type(addString("allocations")); + sampleType->set_unit(addString("count")); + + sampleType = profile.add_sample_type(); + sampleType->set_type(addString("space")); + + auto bytesUnitId = addString("bytes"); + sampleType->set_unit(bytesUnitId); + + auto periodType = profile.mutable_period_type(); + periodType->set_type(sampleType->type()); + periodType->set_unit(sampleType->unit()); + + profile.set_period(snapshot.Period()); + + auto allocatedSizeId = addString("allocated_size"); + auto requestedSizeId = addString("requested_size"); + auto requestedAlignmentId = addString("requested_alignment"); + + THashMap<void*, ui64> locations; + snapshot.Iterate([&] (const tcmalloc::Profile::Sample& sample) { + auto sampleProto = profile.add_sample(); + sampleProto->add_value(sample.count); + sampleProto->add_value(sample.sum); + + auto allocatedSizeLabel = sampleProto->add_label(); + allocatedSizeLabel->set_key(allocatedSizeId); + allocatedSizeLabel->set_num(sample.allocated_size); + allocatedSizeLabel->set_num_unit(bytesUnitId); + + auto requestedSizeLabel = sampleProto->add_label(); + requestedSizeLabel->set_key(requestedSizeId); + requestedSizeLabel->set_num(sample.requested_size); + requestedSizeLabel->set_num_unit(bytesUnitId); + + auto requestedAlignmentLabel = sampleProto->add_label(); + requestedAlignmentLabel->set_key(requestedAlignmentId); + requestedAlignmentLabel->set_num(sample.requested_alignment); + requestedAlignmentLabel->set_num_unit(bytesUnitId); + + for (int i = 0; i < sample.depth; i++) { + auto ip = sample.stack[i]; + + auto it = locations.find(ip); + if (it != locations.end()) { + sampleProto->add_location_id(it->second); + continue; + } + + auto locationId = locations.size() + 1; + + auto location = profile.add_location(); + location->set_address(reinterpret_cast<ui64>(ip)); + location->set_id(locationId); + + sampleProto->add_location_id(locationId); + locations[ip] = locationId; + } + + // TODO(gepardo): Deduplicate values in string table + for (const auto& [key, value] : ReadAllocationTagsData(sample.user_data)) { + auto label = sampleProto->add_label(); + label->set_key(addString(key)); + label->set_str(addString(value)); + } + }); + + profile.set_drop_frames(addString(JoinSeq("|", { + ".*SampleifyAllocation", + ".*AllocSmall", + "slow_alloc", + "TBasicString::TBasicString", + }))); + + Symbolize(&profile, true); + return profile; +} + +NProto::Profile ReadHeapProfile(tcmalloc::ProfileType profileType) +{ + auto snapshot = tcmalloc::MallocExtension::SnapshotCurrent(profileType); + return ConvertAllocationProfile(snapshot); +} + +THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage() +{ + THashMap<TMemoryTag, ui64> usage; + + auto snapshot = tcmalloc::MallocExtension::SnapshotCurrent(tcmalloc::ProfileType::kHeap); + snapshot.Iterate([&] (const tcmalloc::Profile::Sample& sample) { + auto maybeMemoryTagStr = FindTagValue( + ReadAllocationTagsData(sample.user_data), + MemoryTagLiteral); + + if (maybeMemoryTagStr) { + auto memoryTag = FromString<TMemoryTag>(maybeMemoryTagStr.value()); + if (memoryTag != NullMemoryTag) { + usage[memoryTag] += sample.sum; + } + } + }); + + return usage; +} + +static thread_local TMemoryTag MemoryTag = 0; + +TMemoryTag GetMemoryTag() +{ + return MemoryTag; +} + +TMemoryTag SetMemoryTag(TMemoryTag newTag) +{ + auto oldTag = MemoryTag; + MemoryTag = newTag; + return oldTag; +} + +struct TMemoryUsageSnapshot +{ + TSpinLock Lock; + THashMap<TMemoryTag, ui64> Snapshot; +}; + +void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot) +{ + auto snapshot = LeakySingleton<TMemoryUsageSnapshot>(); + auto guard = Guard(snapshot->Lock); + snapshot->Snapshot = std::move(usageSnapshot); +} + +i64 GetEstimatedMemoryUsage(TMemoryTag tag) +{ + auto snapshot = LeakySingleton<TMemoryUsageSnapshot>(); + auto guard = Guard(snapshot->Lock); + auto it = snapshot->Snapshot.find(tag); + if (it != snapshot->Snapshot.end()) { + return it->second; + } + return 0; +} + +int AbslStackUnwinder( + void** frames, + int*, + int maxFrames, + int skipFrames, + const void*, + int*) +{ + NBacktrace::TLibunwindCursor cursor; + + for (int i = 0; i < skipFrames + 1; ++i) { + cursor.MoveNext(); + } + + int count = 0; + for (int i = 0; i < maxFrames; ++i) { + if (cursor.IsFinished()) { + return count; + } + + // IP point's to return address. Substract 1 to get accurate line information for profiler. + frames[i] = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(cursor.GetCurrentIP()) - 1); + count++; + + cursor.MoveNext(); + } + return count; +} + +void EnableMemoryProfilingTags() +{ + StartAllocationTagsCleanupThread(TDuration::Seconds(1)); + tcmalloc::MallocExtension::SetSampleUserDataCallbacks( + &CreateAllocationTagsData, + &CopyAllocationTagsData, + &DestroyAllocationTagsData); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/heap_profiler.h b/yt/yt/library/ytprof/heap_profiler.h new file mode 100644 index 00000000000..55752b188de --- /dev/null +++ b/yt/yt/library/ytprof/heap_profiler.h @@ -0,0 +1,40 @@ +#pragma once + +#include <yt/yt/library/ytprof/profile.pb.h> + +#include <yt/yt/core/tracing/public.h> + +#include <util/generic/hash.h> + +#include <tcmalloc/malloc_extension.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +NProto::Profile ConvertAllocationProfile(const tcmalloc::Profile& snapshot); + +NProto::Profile ReadHeapProfile(tcmalloc::ProfileType profileType); + +int AbslStackUnwinder(void** frames, int*, + int maxFrames, int skipFrames, + const void*, + int*); + +typedef uintptr_t TMemoryTag; + +TMemoryTag GetMemoryTag(); + +TMemoryTag SetMemoryTag(TMemoryTag newTag); + +THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage(); + +void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot); + +i64 GetEstimatedMemoryUsage(TMemoryTag tag); + +void EnableMemoryProfilingTags(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/profile.cpp b/yt/yt/library/ytprof/profile.cpp new file mode 100644 index 00000000000..ed4cc3f9852 --- /dev/null +++ b/yt/yt/library/ytprof/profile.cpp @@ -0,0 +1,34 @@ +#include "profile.h" + +#include <util/stream/str.h> +#include <util/stream/zlib.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +void WriteProfile(IOutputStream* out, const NProto::Profile& profile) +{ + TZLibCompress compress(out, ZLib::StreamType::GZip); + profile.SerializeToArcadiaStream(&compress); + compress.Finish(); +} + +TString SerializeProfile(const NProto::Profile& profile) +{ + TStringStream stream; + WriteProfile(&stream, profile); + return stream.Str(); +} + +void ReadProfile(IInputStream* in, NProto::Profile* profile) +{ + profile->Clear(); + + TZLibDecompress decompress(in); + profile->ParseFromArcadiaStream(&decompress); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/profile.h b/yt/yt/library/ytprof/profile.h new file mode 100644 index 00000000000..6283c200d27 --- /dev/null +++ b/yt/yt/library/ytprof/profile.h @@ -0,0 +1,19 @@ +#pragma once + +#include <util/stream/fwd.h> + +#include <yt/yt/library/ytprof/profile.pb.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +void WriteProfile(IOutputStream* out, const NProto::Profile& profile); + +TString SerializeProfile(const NProto::Profile& profile); + +void ReadProfile(IInputStream* in, NProto::Profile* profile); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/profile.proto b/yt/yt/library/ytprof/profile.proto new file mode 100644 index 00000000000..18eaba3e56a --- /dev/null +++ b/yt/yt/library/ytprof/profile.proto @@ -0,0 +1,209 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Profile is a common stacktrace profile format. +// +// Measurements represented with this format should follow the +// following conventions: +// +// - Consumers should treat unset optional fields as if they had been +// set with their default value. +// +// - When possible, measurements should be stored in "unsampled" form +// that is most useful to humans. There should be enough +// information present to determine the original sampled values. +// +// - On-disk, the serialized proto must be gzip-compressed. +// +// - The profile is represented as a set of samples, where each sample +// references a sequence of locations, and where each location belongs +// to a mapping. +// - There is a N->1 relationship from sample.location_id entries to +// locations. For every sample.location_id entry there must be a +// unique Location with that id. +// - There is an optional N->1 relationship from locations to +// mappings. For every nonzero Location.mapping_id there must be a +// unique Mapping with that id. + +syntax = "proto3"; + +package NYT.NYTProf.NProto; + +message Profile { + // A description of the samples associated with each Sample.value. + // For a cpu profile this might be: + // [["cpu","nanoseconds"]] or [["wall","seconds"]] or [["syscall","count"]] + // For a heap profile, this might be: + // [["allocations","count"], ["space","bytes"]], + // If one of the values represents the number of events represented + // by the sample, by convention it should be at index 0 and use + // sample_type.unit == "count". + repeated ValueType sample_type = 1; + // The set of samples recorded in this profile. + repeated Sample sample = 2; + // Mapping from address ranges to the image/binary/library mapped + // into that address range. mapping[0] will be the main binary. + repeated Mapping mapping = 3; + // Useful program location + repeated Location location = 4; + // Functions referenced by locations + repeated Function function = 5; + // A common table for strings referenced by various messages. + // string_table[0] must always be "". + repeated string string_table = 6; + // frames with Function.function_name fully matching the following + // regexp will be dropped from the samples, along with their successors. + int64 drop_frames = 7; // Index into string table. + // frames with Function.function_name fully matching the following + // regexp will be kept, even if it matches drop_functions. + int64 keep_frames = 8; // Index into string table. + + // The following fields are informational, do not affect + // interpretation of results. + + // Time of collection (UTC) represented as nanoseconds past the epoch. + int64 time_nanos = 9; + // Duration of the profile, if a duration makes sense. + int64 duration_nanos = 10; + // The kind of events between sampled ocurrences. + // e.g [ "cpu","cycles" ] or [ "heap","bytes" ] + ValueType period_type = 11; + // The number of events between sampled occurrences. + int64 period = 12; + // Freeform text associated to the profile. + repeated int64 comment = 13; // Indices into string table. + // Index into the string table of the type of the preferred sample + // value. If unset, clients should default to the last sample value. + int64 default_sample_type = 14; +} + +// ValueType describes the semantics and measurement units of a value. +message ValueType { + int64 type = 1; // Index into string table. + int64 unit = 2; // Index into string table. +} + +// Each Sample records values encountered in some program +// context. The program context is typically a stack trace, perhaps +// augmented with auxiliary information like the thread-id, some +// indicator of a higher level request being handled etc. +message Sample { + // The ids recorded here correspond to a Profile.location.id. + // The leaf is at location_id[0]. + repeated uint64 location_id = 1; + // The type and unit of each value is defined by the corresponding + // entry in Profile.sample_type. All samples must have the same + // number of values, the same as the length of Profile.sample_type. + // When aggregating multiple samples into a single sample, the + // result has a list of values that is the element-wise sum of the + // lists of the originals. + repeated int64 value = 2; + // label includes additional context for this sample. It can include + // things like a thread id, allocation size, etc + repeated Label label = 3; +} + +message Label { + int64 key = 1; // Index into string table + + // At most one of the following must be present + int64 str = 2; // Index into string table + int64 num = 3; + + // Should only be present when num is present. + // Specifies the units of num. + // Use arbitrary string (for example, "requests") as a custom count unit. + // If no unit is specified, consumer may apply heuristic to deduce the unit. + // Consumers may also interpret units like "bytes" and "kilobytes" as memory + // units and units like "seconds" and "nanoseconds" as time units, + // and apply appropriate unit conversions to these. + int64 num_unit = 4; // Index into string table +} + +message Mapping { + // Unique nonzero id for the mapping. + uint64 id = 1; + // Address at which the binary (or DLL) is loaded into memory. + uint64 memory_start = 2; + // The limit of the address range occupied by this mapping. + uint64 memory_limit = 3; + // Offset in the binary that corresponds to the first mapped address. + uint64 file_offset = 4; + // The object this entry is loaded from. This can be a filename on + // disk for the main binary and shared libraries, or virtual + // abstractions like "[vdso]". + int64 filename = 5; // Index into string table + // A string that uniquely identifies a particular program version + // with high probability. E.g., for binaries generated by GNU tools, + // it could be the contents of the .note.gnu.build-id field. + int64 build_id = 6; // Index into string table + + // The following fields indicate the resolution of symbolic info. + bool has_functions = 7; + bool has_filenames = 8; + bool has_line_numbers = 9; + bool has_inline_frames = 10; +} + +// Describes function and line table debug information. +message Location { + // Unique nonzero id for the location. A profile could use + // instruction addresses or any integer sequence as ids. + uint64 id = 1; + // The id of the corresponding profile.Mapping for this location. + // It can be unset if the mapping is unknown or not applicable for + // this profile type. + uint64 mapping_id = 2; + // The instruction address for this location, if available. It + // should be within [Mapping.memory_start...Mapping.memory_limit] + // for the corresponding mapping. A non-leaf address may be in the + // middle of a call instruction. It is up to display tools to find + // the beginning of the instruction if necessary. + uint64 address = 3; + // Multiple line indicates this location has inlined functions, + // where the last entry represents the caller into which the + // preceding entries were inlined. + // + // E.g., if memcpy() is inlined into printf: + // line[0].function_name == "memcpy" + // line[1].function_name == "printf" + repeated Line line = 4; + // Provides an indication that multiple symbols map to this location's + // address, for example due to identical code folding by the linker. In that + // case the line information above represents one of the multiple + // symbols. This field must be recomputed when the symbolization state of the + // profile changes. + bool is_folded = 5; +} + +message Line { + // The id of the corresponding profile.Function for this line. + uint64 function_id = 1; + // Line number in source code. + int64 line = 2; +} + +message Function { + // Unique nonzero id for the function. + uint64 id = 1; + // Name of the function, in human-readable form if available. + int64 name = 2; // Index into string table + // Name of the function, as identified by the system. + // For instance, it can be a C++ mangled name. + int64 system_name = 3; // Index into string table + // Source file containing the function. + int64 filename = 4; // Index into string table + // Line number in source file. + int64 start_line = 5; +} diff --git a/yt/yt/library/ytprof/queue-inl.h b/yt/yt/library/ytprof/queue-inl.h new file mode 100644 index 00000000000..bc030d52fa0 --- /dev/null +++ b/yt/yt/library/ytprof/queue-inl.h @@ -0,0 +1,72 @@ +#ifndef QUEUE_INL_H_ +#error "Direct inclusion of this file is not allowed, include queue.h" +#include "queue.h" +#endif +#undef QUEUE_INL_H_ + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +TStaticQueue::TStaticQueue(size_t logSize) + : Size_(1 << logSize) + , Buffer_(Size_) +{ } + +template <class TFn> +bool TStaticQueue::TryPush(const TFn& getIp) +{ + auto start = WriteSeq_.load(std::memory_order::relaxed); + auto end = ReadSeq_.load(std::memory_order::acquire) + Size_; + + if (start + 1 >= end) { + return false; + } + + int count = 0; + while (true) { + auto [ip, ok] = getIp(); + if (!ok) { + break; + } + + if (start + count + 1 >= end) { + return false; + } + + Buffer_[ToIndex(start+count+1)] = reinterpret_cast<intptr_t>(ip); + count++; + } + + Buffer_[ToIndex(start)] = count; + WriteSeq_.store(start+count+1, std::memory_order::release); + return true; +} + +template <class TFn> +bool TStaticQueue::TryPop(const TFn& onIp) +{ + auto start = ReadSeq_.load(std::memory_order::relaxed); + auto end = WriteSeq_.load(std::memory_order::acquire); + + if (start == end) { + return false; + } + + auto count = Buffer_[ToIndex(start)]; + for (int i = 0; i < count; i++) { + onIp(reinterpret_cast<void*>(Buffer_[ToIndex(start+i+1)])); + } + + ReadSeq_.store(start+count+1, std::memory_order::release); + return true; +} + +size_t TStaticQueue::ToIndex(i64 seq) const +{ + return seq & (Size_ - 1); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/queue.h b/yt/yt/library/ytprof/queue.h new file mode 100644 index 00000000000..ec6617d4131 --- /dev/null +++ b/yt/yt/library/ytprof/queue.h @@ -0,0 +1,45 @@ +#pragma once + +#include <atomic> +#include <optional> +#include <vector> + +#include <util/generic/fwd.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +/*! + * TStaticQueue is SPSC thread and signal safe queue that does not perform + * any allocations. + */ +class TStaticQueue +{ +public: + inline TStaticQueue(size_t logSize); + + template <class TFn> + bool TryPush(const TFn& getIp); + + template <class TFn> + bool TryPop(const TFn& onIp); + +private: + const i64 Size_; + + std::atomic<i64> WriteSeq_{0}; + std::atomic<i64> ReadSeq_{0}; + + std::vector<intptr_t> Buffer_; + + inline size_t ToIndex(i64 seq) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf + +#define QUEUE_INL_H_ +#include "queue-inl.h" +#undef QUEUE_INL_H_ diff --git a/yt/yt/library/ytprof/signal_safe_profiler.cpp b/yt/yt/library/ytprof/signal_safe_profiler.cpp new file mode 100644 index 00000000000..4312e7527a2 --- /dev/null +++ b/yt/yt/library/ytprof/signal_safe_profiler.cpp @@ -0,0 +1,370 @@ +#include "signal_safe_profiler.h" +#include "symbolize.h" + +#if defined(_linux_) +#include <link.h> +#include <sys/syscall.h> +#include <sys/prctl.h> + +#include <util/system/yield.h> + +#include <csignal> +#include <functional> + +#include <util/generic/yexception.h> +#endif + +#include <yt/yt/core/misc/proc.h> + +#include <library/cpp/yt/cpu_clock/clock.h> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +TProfileLocation::operator size_t() const +{ + size_t hash = Tid; + hash = CombineHashes(hash, std::hash<TString>()(ThreadName)); + + for (auto ip : Backtrace) { + hash = CombineHashes(hash, ip); + } + + for (const auto& tag : Tags) { + hash = CombineHashes(hash, + CombineHashes( + std::hash<TString>{}(tag.first), + std::hash<std::variant<TString, i64>>{}(tag.second))); + } + + return hash; +} + +//////////////////////////////////////////////////////////////////////////////// + +Y_WEAK void* AcquireFiberTagStorage() +{ + return nullptr; +} + +Y_WEAK std::vector<std::pair<TString, std::variant<TString, i64>>> ReadFiberTags(void* /* storage */) +{ + return {}; +} + +Y_WEAK void ReleaseFiberTagStorage(void* /* storage */) +{ } + +Y_WEAK TCpuInstant GetTraceContextTimingCheckpoint() +{ + return 0; +} + +//////////////////////////////////////////////////////////////////////////////// + +#if not defined(_linux_) + +void TSignalSafeProfiler::Start() +{ } + +void TSignalSafeProfiler::Stop() +{ } + +NProto::Profile TSignalSafeProfiler::ReadProfile() +{ + return {}; +} + +void TSignalSafeProfiler::RecordSample(NBacktrace::TFramePointerCursor* /*cursor*/, i64 /*value*/) +{ } + +void TSignalSafeProfiler::DequeueSamples() +{ } + +#endif + +//////////////////////////////////////////////////////////////////////////////// + +TSignalSafeProfiler::TSignalSafeProfiler(TSignalSafeProfilerOptions options) + : Options_(options) + , Queue_(options.RingBufferLogSize) +{ } + +TSignalSafeProfiler::~TSignalSafeProfiler() +{ + YT_VERIFY(Stop_); +} + +#if defined(_linux_) + +void TSignalSafeProfiler::Start() +{ + if (!IsProfileBuild()) { + throw yexception() << "frame pointers not available; rebuild with --build=profile"; + } + + EnableProfiler(); + Stop_ = false; + BackgroundThread_ = std::thread([this] { + DequeueSamples(); + }); +} + +void TSignalSafeProfiler::Stop() +{ + if (Stop_) { + return; + } + + Stop_ = true; + DisableProfiler(); + BackgroundThread_.join(); +} + +TCpuDuration GetActionRunTime() +{ + auto fiberStartTime = GetTraceContextTimingCheckpoint(); + if (fiberStartTime == 0) { + return 0; + } + + return GetCpuInstant() - fiberStartTime; +} + +void TSignalSafeProfiler::RecordSample(NBacktrace::TFramePointerCursor* cursor, i64 value) +{ + int count = 0; + bool pushTid = false; + bool pushFiberStorage = false; + bool pushValue = false; + bool pushActionRunTime = false; + int tagIndex = 0; + + auto tagsPtr = GetCpuProfilerTags(); + + uintptr_t threadName[2] = {}; + prctl(PR_GET_NAME, (unsigned long)threadName, 0UL, 0UL, 0UL); + int namePushed = 0; + + auto ok = Queue_.TryPush([&] () -> std::pair<const void*, bool> { + if (!pushValue) { + pushValue = true; + return {reinterpret_cast<const void*>(value), true}; + } + + if (!pushTid) { + pushTid = true; + return {reinterpret_cast<void*>(GetCurrentThreadId()), true}; + } + + if (Options_.RecordActionRunTime && !pushActionRunTime) { + pushActionRunTime = true; + return {reinterpret_cast<const void*>(GetActionRunTime()), true}; + } + + if (namePushed < 2) { + return {reinterpret_cast<const void*>(threadName[namePushed++]), true}; + } + + if (!pushFiberStorage) { + pushFiberStorage = true; + return {AcquireFiberTagStorage(), true}; + } + + if (tagIndex < MaxActiveTags) { + if (tagsPtr) { + auto tag = (*tagsPtr)[tagIndex].GetFromSignal(); + tagIndex++; + return {reinterpret_cast<const void*>(tag.Release()), true}; + } else { + tagIndex++; + return {nullptr, true}; + } + } + + if (count > Options_.MaxBacktraceSize) { + return {nullptr, false}; + } + + if (cursor->IsFinished()) { + return {nullptr, false}; + } + + auto ip = cursor->GetCurrentIP(); + if (count != 0) { + // First IP points to next executing instruction. + // All other IP's are return addresses. + // Substract 1 to get accurate line information for profiler. + ip = reinterpret_cast<const void*>(reinterpret_cast<uintptr_t>(ip) - 1); + } + + cursor->MoveNext(); + count++; + return {ip, true}; + }); + + if (!ok) { + QueueOverflows_++; + } +} + +void TSignalSafeProfiler::DequeueSamples() +{ + TProfileLocation sample; + while (!Stop_) { + Sleep(Options_.DequeuePeriod); + + while (true) { + sample.Backtrace.clear(); + sample.Tags.clear(); + + std::optional<i64> value; + std::optional<size_t> tid; + std::optional<TCpuDuration> actionRunTime; + uintptr_t threadName[2] = {}; + int namePopped = 0; + std::optional<void*> fiberStorage; + + int tagIndex = 0; + bool ok = Queue_.TryPop([&] (void* ip) { + if (!value) { + value = reinterpret_cast<i64>(ip); + return; + } + + if (!tid) { + tid = reinterpret_cast<size_t>(ip); + return; + } + + if (Options_.RecordActionRunTime && !actionRunTime) { + actionRunTime = reinterpret_cast<TCpuDuration>(ip); + return; + } + + if (namePopped < 2) { + threadName[namePopped++] = reinterpret_cast<uintptr_t>(ip); + return; + } + + if (!fiberStorage) { + fiberStorage = ip; + return; + } + + if (tagIndex < MaxActiveTags) { + auto tag = reinterpret_cast<TProfilerTag*>(ip); + if (tag) { + if (tag->StringValue) { + sample.Tags.emplace_back(tag->Name, *tag->StringValue); + } else { + sample.Tags.emplace_back(tag->Name, *tag->IntValue); + } + } + tagIndex++; + return; + } + + sample.Backtrace.push_back(reinterpret_cast<ui64>(ip)); + }); + + if (!ok) { + break; + } + + sample.ThreadName = TString{reinterpret_cast<char*>(threadName)}; + sample.Tid = *tid; + for (auto& tag : ReadFiberTags(*fiberStorage)) { + sample.Tags.push_back(std::move(tag)); + } + ReleaseFiberTagStorage(*fiberStorage); + + if (Options_.RecordActionRunTime) { + sample.Tags.emplace_back( + "action_run_time_us", + static_cast<i64>(CpuDurationToDuration(*actionRunTime).MicroSeconds())); + } + + auto& counter = Counters_[sample]; + counter.Count++; + counter.Total += *value; + } + } +} + +NProto::Profile TSignalSafeProfiler::ReadProfile() +{ + NProto::Profile profile; + profile.add_string_table(); + + THashMap<TString, ui64> stringTable; + auto stringify = [&] (const TString& str) -> i64 { + if (auto it = stringTable.find(str); it != stringTable.end()) { + return it->second; + } else { + auto nameId = profile.string_table_size(); + profile.add_string_table(str); + stringTable[str] = nameId; + return nameId; + } + }; + + AnnotateProfile(&profile, stringify); + + THashMap<uintptr_t, ui64> locations; + for (const auto& [backtrace, counters] : Counters_) { + auto sample = profile.add_sample(); + + sample->add_value(counters.Count); + sample->add_value(EncodeValue(counters.Total)); + + auto label = sample->add_label(); + label->set_key(stringify("tid")); + label->set_num(backtrace.Tid); + + label = sample->add_label(); + label->set_key(stringify("thread")); + label->set_str(stringify(backtrace.ThreadName)); + + for (auto tag : backtrace.Tags) { + auto label = sample->add_label(); + label->set_key(stringify(tag.first)); + + if (auto intValue = std::get_if<i64>(&tag.second)) { + label->set_num(*intValue); + } else if (auto strValue = std::get_if<TString>(&tag.second)) { + label->set_str(stringify(*strValue)); + } + } + + for (auto ip : backtrace.Backtrace) { + auto it = locations.find(ip); + if (it != locations.end()) { + sample->add_location_id(it->second); + continue; + } + + auto locationId = locations.size() + 1; + + auto location = profile.add_location(); + location->set_address(ip); + location->set_id(locationId); + + sample->add_location_id(locationId); + locations[ip] = locationId; + } + } + + if (QueueOverflows_ > 0) { + profile.add_comment(stringify("ytprof.queue_overflows=" + std::to_string(QueueOverflows_))); + } + + return profile; +} + +#endif + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/signal_safe_profiler.h b/yt/yt/library/ytprof/signal_safe_profiler.h new file mode 100644 index 00000000000..d09ba34dd3d --- /dev/null +++ b/yt/yt/library/ytprof/signal_safe_profiler.h @@ -0,0 +1,95 @@ +#pragma once + +#include "queue.h" + +#include <yt/yt/library/ytprof/profile.pb.h> +#include <yt/yt/library/ytprof/api/api.h> + +#include <library/cpp/yt/memory/intrusive_ptr.h> +#include <library/cpp/yt/memory/safe_memory_reader.h> + +#include <library/cpp/yt/backtrace/cursors/frame_pointer/frame_pointer_cursor.h> + +#include <util/generic/hash.h> + +#include <util/datetime/base.h> + +#include <thread> +#include <array> +#include <variant> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +struct TProfileLocation +{ + size_t Tid = 0; + TString ThreadName; + std::vector<std::pair<TString, std::variant<TString, i64>>> Tags; + std::vector<ui64> Backtrace; + + bool operator == (const TProfileLocation& other) const = default; + operator size_t() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSignalSafeProfilerOptions +{ +public: + TDuration DequeuePeriod = TDuration::MilliSeconds(100); + int MaxBacktraceSize = 256; + int RingBufferLogSize = 20; // 1 MiB + bool RecordActionRunTime = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSignalSafeProfiler +{ +public: + explicit TSignalSafeProfiler(TSignalSafeProfilerOptions options = {}); + virtual ~TSignalSafeProfiler(); + + void Start(); + void Stop(); + + //! ReadProfile returns accumulated profile. + /*! + * This function may be called only after profiler is stopped. + */ + NProto::Profile ReadProfile(); + +protected: + const TSignalSafeProfilerOptions Options_; + + TSafeMemoryReader Reader_; + + std::atomic<bool> Stop_ = true; + std::atomic<i64> QueueOverflows_ = 0; + + TStaticQueue Queue_; + std::thread BackgroundThread_; + + struct TProfileCounter + { + i64 Count = 0; + i64 Total = 0; + }; + + THashMap<TProfileLocation, TProfileCounter> Counters_; + + virtual void EnableProfiler() = 0; + virtual void DisableProfiler() = 0; + virtual void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) = 0; + virtual i64 EncodeValue(i64 value) = 0; + + void RecordSample(NBacktrace::TFramePointerCursor* cursor, i64 value); + + void DequeueSamples(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/spinlock_profiler.cpp b/yt/yt/library/ytprof/spinlock_profiler.cpp new file mode 100644 index 00000000000..a9b3b6b2482 --- /dev/null +++ b/yt/yt/library/ytprof/spinlock_profiler.cpp @@ -0,0 +1,225 @@ +#include "spinlock_profiler.h" + +#include <library/cpp/yt/backtrace/cursors/interop/interop.h> + +#include <absl/base/internal/spinlock.h> +#include <absl/base/internal/cycleclock.h> + +#include <util/system/yield.h> + +#include <mutex> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +TSpinlockProfiler::TSpinlockProfiler(TSpinlockProfilerOptions options) + : TSignalSafeProfiler(options) + , Options_(options) +{ } + +TSpinlockProfiler::~TSpinlockProfiler() +{ + Stop(); +} + +std::atomic<int> TSpinlockProfiler::SamplingRate_; +std::atomic<TSpinlockProfiler*> TSpinlockProfiler::ActiveProfiler_; +std::atomic<bool> TSpinlockProfiler::HandlingEvent_; +std::once_flag TSpinlockProfiler::HookInitialized_; + +void TSpinlockProfiler::EnableProfiler() +{ + std::call_once(HookInitialized_, [] { + absl::base_internal::RegisterSpinLockProfiler(&TSpinlockProfiler::OnEvent); + return true; + }); + + TSpinlockProfiler* expected = nullptr; + if (!ActiveProfiler_.compare_exchange_strong(expected, this)) { + throw yexception() << "Another instance of spinlock profiler is running"; + } + SamplingRate_ = Options_.ProfileFraction; +} + +void TSpinlockProfiler::DisableProfiler() +{ + SamplingRate_ = 0; + ActiveProfiler_ = nullptr; + while (HandlingEvent_) { + SchedYield(); + } +} + +void TSpinlockProfiler::RecordEvent(const void* /*lock*/, int64_t waitCycles) +{ + unw_context_t uwContext; + YT_VERIFY(unw_getcontext(&uwContext) == 0); + + unw_cursor_t unwCursor; + YT_VERIFY(unw_init_local(&unwCursor, &uwContext) == 0); + + auto fpCursorContext = NBacktrace::FramePointerCursorContextFromLibunwindCursor(unwCursor); + NBacktrace::TFramePointerCursor fpCursor(&Reader_, fpCursorContext); + RecordSample(&fpCursor, waitCycles); +} + +static thread_local int SpinlockEventCount; + +void TSpinlockProfiler::OnEvent(const void* lock, int64_t waitCycles) +{ + auto samplingRate = SamplingRate_.load(std::memory_order::relaxed); + if (samplingRate == 0) { + return; + } + + if (SpinlockEventCount < samplingRate) { + SpinlockEventCount++; + return; + } + + SpinlockEventCount = 0; + while (HandlingEvent_.exchange(true)) { + SchedYield(); + } + + if (auto profiler = ActiveProfiler_.load(); profiler) { + profiler->RecordEvent(lock, waitCycles); + } + + HandlingEvent_.store(false); +} + +void TSpinlockProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) +{ + auto sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("sample")); + sampleType->set_unit(stringify("count")); + + sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("cpu")); + sampleType->set_unit(stringify("nanoseconds")); + + auto periodType = profile->mutable_period_type(); + periodType->set_type(stringify("sample")); + periodType->set_unit(stringify("count")); + + profile->set_period(Options_.ProfileFraction); +} + +i64 TSpinlockProfiler::EncodeValue(i64 value) +{ + return value / absl::base_internal::CycleClock::Frequency() * 1e9; +} + +//////////////////////////////////////////////////////////////////////////////// + + +TBlockingProfiler::TBlockingProfiler(TSpinlockProfilerOptions options) + : TSignalSafeProfiler(options) + , Options_(options) +{ } + +TBlockingProfiler::~TBlockingProfiler() +{ + Stop(); +} + +std::atomic<int> TBlockingProfiler::SamplingRate_; +std::atomic<TBlockingProfiler*> TBlockingProfiler::ActiveProfiler_; +std::atomic<bool> TBlockingProfiler::HandlingEvent_; +std::once_flag TBlockingProfiler::HookInitialized_; + +void TBlockingProfiler::EnableProfiler() +{ + std::call_once(HookInitialized_, [] { + NThreading::RegisterSpinWaitSlowPathHook(&TBlockingProfiler::OnEvent); + return true; + }); + + TBlockingProfiler* expected = nullptr; + if (!ActiveProfiler_.compare_exchange_strong(expected, this)) { + throw yexception() << "Another instance of spinlock profiler is running"; + } + SamplingRate_ = Options_.ProfileFraction; +} + +void TBlockingProfiler::DisableProfiler() +{ + SamplingRate_ = 0; + ActiveProfiler_ = nullptr; + while (HandlingEvent_) { + SchedYield(); + } +} + +void TBlockingProfiler::RecordEvent( + TCpuDuration cpuDelay, + const ::TSourceLocation& /*location*/, + NThreading::ESpinLockActivityKind /*activityKind*/) +{ + unw_context_t unwContext; + YT_VERIFY(unw_getcontext(&unwContext) == 0); + + unw_cursor_t unwCursor; + YT_VERIFY(unw_init_local(&unwCursor, &unwContext) == 0); + + auto fpCursorContext = NBacktrace::FramePointerCursorContextFromLibunwindCursor(unwCursor); + NBacktrace::TFramePointerCursor fpCursor(&Reader_, fpCursorContext); + RecordSample(&fpCursor, cpuDelay); +} + +static thread_local int YTSpinlockEventCount; + +void TBlockingProfiler::OnEvent( + TCpuDuration cpuDelay, + const ::TSourceLocation& location, + NThreading::ESpinLockActivityKind activityKind) +{ + auto samplingRate = SamplingRate_.load(std::memory_order::relaxed); + if (samplingRate == 0) { + return; + } + + if (YTSpinlockEventCount < samplingRate) { + YTSpinlockEventCount++; + return; + } + + YTSpinlockEventCount = 0; + while (HandlingEvent_.exchange(true)) { + SchedYield(); + } + + if (auto profiler = ActiveProfiler_.load(); profiler) { + profiler->RecordEvent(cpuDelay, location, activityKind); + } + + HandlingEvent_.store(false); +} + +void TBlockingProfiler::AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) +{ + auto sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("sample")); + sampleType->set_unit(stringify("count")); + + sampleType = profile->add_sample_type(); + sampleType->set_type(stringify("cpu")); + sampleType->set_unit(stringify("nanoseconds")); + + auto periodType = profile->mutable_period_type(); + periodType->set_type(stringify("sample")); + periodType->set_unit(stringify("count")); + + profile->set_period(Options_.ProfileFraction); +} + +i64 TBlockingProfiler::EncodeValue(i64 value) +{ + return CpuDurationToDuration(value).NanoSeconds(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/spinlock_profiler.h b/yt/yt/library/ytprof/spinlock_profiler.h new file mode 100644 index 00000000000..6df576c3b7a --- /dev/null +++ b/yt/yt/library/ytprof/spinlock_profiler.h @@ -0,0 +1,82 @@ +#pragma once + +#include "signal_safe_profiler.h" + +#include <library/cpp/yt/threading/spin_wait_hook.h> + +#include <mutex> + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +struct TSpinlockProfilerOptions + : public TSignalSafeProfilerOptions +{ + int ProfileFraction = 100; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// TSpinlockProfiler profiles wait events from absl spinlock. +class TSpinlockProfiler + : public TSignalSafeProfiler +{ +public: + explicit TSpinlockProfiler(TSpinlockProfilerOptions options); + ~TSpinlockProfiler(); + +private: + const TSpinlockProfilerOptions Options_; + + static std::atomic<int> SamplingRate_; + static std::atomic<TSpinlockProfiler*> ActiveProfiler_; + static std::atomic<bool> HandlingEvent_; + static std::once_flag HookInitialized_; + + void EnableProfiler() override; + void DisableProfiler() override; + void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override; + i64 EncodeValue(i64 value) override; + + static void OnEvent(const void *lock, int64_t waitCycles); + void RecordEvent(const void *lock, int64_t waitCycles); +}; + +//////////////////////////////////////////////////////////////////////////////// + +// TBlockingProfiler profiles wait events from yt spinlocks. +class TBlockingProfiler + : public TSignalSafeProfiler +{ +public: + explicit TBlockingProfiler(TSpinlockProfilerOptions options); + ~TBlockingProfiler(); + +private: + const TSpinlockProfilerOptions Options_; + + static std::atomic<int> SamplingRate_; + static std::atomic<TBlockingProfiler*> ActiveProfiler_; + static std::atomic<bool> HandlingEvent_; + static std::once_flag HookInitialized_; + + void EnableProfiler() override; + void DisableProfiler() override; + void AnnotateProfile(NProto::Profile* profile, const std::function<i64(const TString&)>& stringify) override; + i64 EncodeValue(i64 value) override; + + static void OnEvent( + TCpuDuration cpuDelay, + const ::TSourceLocation& location, + NThreading::ESpinLockActivityKind activityKind); + + void RecordEvent( + TCpuDuration cpuDelay, + const ::TSourceLocation& location, + NThreading::ESpinLockActivityKind activityKind); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/symbolize.cpp b/yt/yt/library/ytprof/symbolize.cpp new file mode 100644 index 00000000000..553a34c4e34 --- /dev/null +++ b/yt/yt/library/ytprof/symbolize.cpp @@ -0,0 +1,809 @@ +#include "symbolize.h" + +#include <library/cpp/yt/assert/assert.h> + +#include <util/folder/path.h> + +#include <util/generic/yexception.h> +#include <util/generic/hash.h> + +#include <util/string/printf.h> + +#include <util/system/filemap.h> +#include <util/system/type_name.h> +#include <util/system/unaligned_mem.h> + +#include <dlfcn.h> +#include <link.h> +#include <elf.h> + +#ifndef YT_NO_AUXV +#include <sys/auxv.h> +#endif + +#include <exception> +#include <cxxabi.h> + +namespace NYT::NYTProf { + +class TDLAddrSymbolizer +{ +public: + explicit TDLAddrSymbolizer(NProto::Profile* profile) + : Profile_(profile) + { } + + void Symbolize() + { + for (int i = 0; i < Profile_->function_size(); i++) { + auto function = Profile_->mutable_function(i); + + void* ip = reinterpret_cast<void*>(function->id()); + + Dl_info dlinfo{}; + if (dladdr(ip, &dlinfo) == 0) { + continue; + } + + TString name; + TString demangledName; + if (!dlinfo.dli_sname) { + auto offset = reinterpret_cast<intptr_t>(ip) - reinterpret_cast<intptr_t>(dlinfo.dli_fbase); + auto filename = TFsPath{dlinfo.dli_fname}.Basename(); + name = Sprintf("%p", reinterpret_cast<void*>(offset)) + "@" + filename; + demangledName = name; + } else { + name = dlinfo.dli_sname; + demangledName = CppDemangle(dlinfo.dli_sname); + } + + function->set_name(SymbolizeString(demangledName)); + function->set_system_name(SymbolizeString(name)); + } + } + +private: + NProto::Profile* const Profile_; + + THashMap<TString, ui64> Strings_; + + ui64 SymbolizeString(const TString& str) + { + auto it = Strings_.find(str); + if (it != Strings_.end()) { + return it->second; + } + + auto id = Profile_->string_table_size(); + Strings_[str] = id; + Profile_->add_string_table(str); + return id; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +using TElfAddr = ElfW(Addr); +using TElfDyn = ElfW(Dyn); +using TElfEhdr = ElfW(Ehdr); +using TElfOff = ElfW(Off); +using TElfPhdr = ElfW(Phdr); +using TElfShdr = ElfW(Shdr); +using TElfNhdr = ElfW(Nhdr); +using TElfSym = ElfW(Sym); +using TElfWord = ElfW(Word); + +class TElf final +{ +public: + class TSection + { + public: + const TElfShdr& Header; + + const char* Name() const + { + if (!Elf_.SectionNames_) { + throw yexception() << "Section names are not initialized"; + } + + if (Header.sh_name > Elf_.SectionNamesSize_) { + throw yexception() << "Section name point outside of strings table"; + } + + return Elf_.SectionNames_ + Header.sh_name; + } + + const char* begin() const + { + return Elf_.Mapped_ + Header.sh_offset; + } + + const char* end() const + { + return begin() + size(); + } + + size_t size() const + { + return Header.sh_size; + } + + TSection(const TElfShdr& header, const TElf& elf) + : Header(header) + , Elf_(elf) + { } + + private: + const TElf& Elf_; + }; + + explicit TElf(const TString& path) + : FileMap_(path) + { + FileMap_.Map(0, FileMap_.GetFile().GetLength()); + + ElfSize_ = FileMap_.MappedSize(); + Mapped_ = reinterpret_cast<const char*>(FileMap_.Ptr()); + + if (ElfSize_ < sizeof(TElfEhdr)) { + throw yexception() << "The size of ELF file is too small: " << ElfSize_; + } + + Header_ = reinterpret_cast<const TElfEhdr*>(Mapped_); + + if (memcmp(Header_->e_ident, "\x7F""ELF", 4) != 0) { + throw yexception() << "The file is not ELF according to magic"; + } + + TElfOff sectionHeaderOffset = Header_->e_shoff; + uint16_t sectionHeaderNumEntries = Header_->e_shnum; + + if (!sectionHeaderOffset || + !sectionHeaderNumEntries || + sectionHeaderOffset + sectionHeaderNumEntries * sizeof(TElfShdr) > ElfSize_) + { + throw yexception() << "The ELF is truncated (section header points after end of file)"; + } + + SectionHeaders_ = reinterpret_cast<const TElfShdr*>(Mapped_ + sectionHeaderOffset); + + auto sectionStrtab = FindSection([&] (const TSection& section, size_t idx) { + return section.Header.sh_type == SHT_STRTAB && Header_->e_shstrndx == idx; + }); + + if (!sectionStrtab) { + throw yexception() << "The ELF doesn't have string table with section names"; + } + + TElfOff sectionNamesOffset = sectionStrtab->Header.sh_offset; + if (sectionNamesOffset >= ElfSize_) { + throw yexception() << "The ELF is truncated (section names string table points after end of file)"; + } + if (sectionNamesOffset + SectionNamesSize_ > ElfSize_) { + throw yexception() << "The ELF is truncated (section names string table is truncated)"; + } + + SectionNames_ = reinterpret_cast<const char *>(Mapped_ + sectionNamesOffset); + SectionNamesSize_ = sectionStrtab->Header.sh_offset; + + TElfOff programHeaderOffset = Header_->e_phoff; + uint16_t programHeaderNumEntries = Header_->e_phnum; + + if (!programHeaderOffset || + !programHeaderNumEntries || + programHeaderOffset + programHeaderNumEntries * sizeof(TElfPhdr) > ElfSize_ + ) { + throw yexception() << "The ELF is truncated (program header points after end of file)"; + } + + ProgramHeaders_ = reinterpret_cast<const TElfPhdr*>(Mapped_ + programHeaderOffset); + } + + bool IterateSections(std::function<bool(const TSection& section, size_t idx)> pred) const + { + for (size_t idx = 0; idx < Header_->e_shnum; ++idx) { + TSection section(SectionHeaders_[idx], *this); + + if (section.Header.sh_offset + section.Header.sh_size > ElfSize_) { + continue; + } + + if (pred(section, idx)) { + return true; + } + } + return false; + } + + std::optional<TSection> FindSection(std::function<bool(const TSection& section, size_t idx)> pred) const + { + std::optional<TSection> result; + + IterateSections([&] (const TSection& section, size_t idx) { + if (pred(section, idx)) { + result.emplace(section); + return true; + } + return false; + }); + + return result; + } + + std::optional<TSection> FindSectionByName(const char* name) const + { + return FindSection([&] (const TSection& section, size_t) { return 0 == strcmp(name, section.Name()); }); + } + + const char* begin() const { return Mapped_; } + const char* end() const { return Mapped_ + ElfSize_; } + size_t size() const { return ElfSize_; } + + TString GetBuildId() const + { + for (size_t idx = 0; idx < Header_->e_phnum; ++idx) { + const TElfPhdr& phdr = ProgramHeaders_[idx]; + + if (phdr.p_type == PT_NOTE) { + return GetBuildId(Mapped_ + phdr.p_offset, phdr.p_filesz); + } + } + + return {}; + } + + static TString GetBuildId(const char* nhdrPos, size_t nhdrSize) + { + const char* nhdrEnd = nhdrPos + nhdrSize; + + while (nhdrPos < nhdrEnd) { + TElfNhdr nhdr = ReadUnaligned<TElfNhdr>(nhdrPos); + + nhdrPos += sizeof(TElfNhdr) + nhdr.n_namesz; + if (nhdr.n_type == NT_GNU_BUILD_ID) { + const char* build_id = nhdrPos; + return {build_id, nhdr.n_descsz}; + } + nhdrPos += nhdr.n_descsz; + } + + return {}; + } + +private: + TFileMap FileMap_; + + size_t ElfSize_ = 0; + const char* Mapped_ = nullptr; + const TElfEhdr* Header_; + const TElfShdr* SectionHeaders_; + const TElfPhdr* ProgramHeaders_; + + const char* SectionNames_ = nullptr; + size_t SectionNamesSize_ = 0; +}; + +using TElfPtr = std::shared_ptr<TElf>; + +//////////////////////////////////////////////////////////////////////////////// + +std::optional<std::pair<void*, void*>> GetExecutableRange(dl_phdr_info* info) +{ + const TElfPhdr* load = nullptr; + for (int i = 0; i < info->dlpi_phnum; i++) { + load = &(info->dlpi_phdr[i]); + if (load->p_type == PT_LOAD && (load->p_flags & PF_X) != 0) { + break; + } + } + + if (!load) { + return {}; + } + + return std::pair{ + reinterpret_cast<void*>(info->dlpi_addr + load->p_vaddr), + reinterpret_cast<void*>(info->dlpi_addr + load->p_vaddr + load->p_memsz) + }; +} + +#if defined(_msan_enabled_) +extern "C" void __msan_unpoison_string(const volatile void* a); +#endif + +class TSymbolIndex +{ +public: + TSymbolIndex() + { + dl_iterate_phdr(CollectSymbols, this); + + std::sort(Objects_.begin(), Objects_.end(), [] (const TObject& a, const TObject& b) { return a.AddressBegin < b.AddressBegin; }); + std::sort(Symbols_.begin(), Symbols_.end(), [] (const TSymbol& a, const TSymbol& b) { return a.AddressBegin < b.AddressBegin; }); + + /// We found symbols both from loaded program headers and from ELF symbol tables. + Symbols_.erase(std::unique(Symbols_.begin(), Symbols_.end(), [] (const TSymbol &a, const TSymbol& b) { + return a.AddressBegin == b.AddressBegin && a.AddressEnd == b.AddressEnd; + }), Symbols_.end()); + } + + static int CollectSymbols(dl_phdr_info* info, size_t, void* ptr) + { + TSymbolIndex* symbolIndex = reinterpret_cast<TSymbolIndex*>(ptr); + + symbolIndex->CollectSymbolsFromProgramHeaders(info); + symbolIndex->CollectSymbolsFromElf(info); + + /* Continue iterations */ + return 0; + } + + struct TSymbol + { + const void* AddressBegin; + const void* AddressEnd; + const char* Name; + }; + + struct TObject + { + const void* AddressBegin; + const void* AddressEnd; + + TString Name; + TString BuildId; + + TElfPtr Elf; + }; + + const TSymbol* FindSymbol(const void* address) const + { + return Find(address, Symbols_); + } + + const TObject* FindObject(const void* address) const + { + return Find(address, Objects_); + } + + const std::vector<TSymbol>& Symbols() const + { + return Symbols_; + } + + const std::vector<TObject>& Objects() const + { + return Objects_; + } + + static TString GetBuildId(dl_phdr_info* info) + { + for (size_t header_index = 0; header_index < info->dlpi_phnum; ++header_index) { + const auto& phdr = info->dlpi_phdr[header_index]; + if (phdr.p_type != PT_NOTE) + continue; + + return TElf::GetBuildId(reinterpret_cast<const char *>(info->dlpi_addr + phdr.p_vaddr), phdr.p_memsz); + } + return {}; + } + +private: + std::vector<TSymbol> Symbols_; + std::vector<TObject> Objects_; + + THashMap<TString, TElfPtr> ObjectNameToElf_; + + template <typename T> + static const T* Find(const void* address, const std::vector<T>& vec) + { + auto it = std::lower_bound(vec.begin(), vec.end(), address, [] (const T& symbol, const void* addr) { + return symbol.AddressBegin <= addr; + }); + + if (it == vec.begin()) { + return nullptr; + } else { + --it; /// Last range that has left boundary less or equals than address. + } + + if (address >= it->AddressBegin && address < it->AddressEnd) { + return &*it; + } else { + return nullptr; + } + } + + void CollectSymbolsFromProgramHeaders(dl_phdr_info* info) + { + /* Iterate over all headers of the current shared lib + * (first call is for the executable itself) + */ + for (size_t headerIndex = 0; headerIndex < info->dlpi_phnum; ++headerIndex) { + /* Further processing is only needed if the dynamic section is reached + */ + if (info->dlpi_phdr[headerIndex].p_type != PT_DYNAMIC) + continue; + + /* Get a pointer to the first entry of the dynamic section. + * It's address is the shared lib's address + the virtual address + */ + const auto* dynBegin = reinterpret_cast<const TElfDyn*>(info->dlpi_addr + info->dlpi_phdr[headerIndex].p_vaddr); + + /// For unknown reason, addresses are sometimes relative sometimes absolute. + auto correctAddress = [] (TElfAddr base, TElfAddr ptr) { + return ptr > base ? ptr : base + ptr; + }; + + /* Iterate over all entries of the dynamic section until the + * end of the symbol table is reached. This is indicated by + * an entry with d_tag == DT_NULL. + */ + + size_t symCnt = 0; + for (const auto * it = dynBegin; it->d_tag != DT_NULL; ++it) { + if (it->d_tag == DT_GNU_HASH) { + /// This code based on Musl-libc. + + const uint32_t* buckets = nullptr; + const uint32_t* hashval = nullptr; + + const auto* hash = reinterpret_cast<const TElfWord*>(correctAddress(info->dlpi_addr, it->d_un.d_ptr)); + + buckets = hash + 4 + (hash[2] * sizeof(size_t) / 4); + + for (TElfWord index = 0; index < hash[0]; ++index) { + if (buckets[index] > symCnt) { + symCnt = buckets[index]; + } + } + + if (symCnt) { + symCnt -= hash[1]; + hashval = buckets + hash[0] + symCnt; + do { + ++symCnt; + } while (!(*hashval++ & 1)); + } + + break; + } + } + + if (!symCnt) { + continue; + } + + const char* strtab = nullptr; + for (const auto* it = dynBegin; it->d_tag != DT_NULL; ++it) { + if (it->d_tag == DT_STRTAB) { + strtab = reinterpret_cast<const char *>(correctAddress(info->dlpi_addr, it->d_un.d_ptr)); + break; + } + } + + if (!strtab) { + continue; + } + + for (const auto* it = dynBegin; it->d_tag != DT_NULL; ++it) { + if (it->d_tag == DT_SYMTAB) { + /* Get the pointer to the first entry of the symbol table */ + const auto* elfSym = reinterpret_cast<const TElfSym*>(correctAddress(info->dlpi_addr, it->d_un.d_ptr)); + + /* Iterate over the symbol table */ + for (TElfWord symIndex = 0; symIndex < static_cast<TElfWord>(symCnt); ++symIndex) { + /// We are not interested in empty symbols. + if (!elfSym[symIndex].st_size) { + continue; + } + + /* Get the name of the sym_index-th symbol. + * This is located at the address of st_name relative to the beginning of the string table. + */ + const auto* symName = &strtab[elfSym[symIndex].st_name]; + if (!symName) { + continue; + } + + Symbols_.push_back(TSymbol{ + .AddressBegin = reinterpret_cast<const void *>(info->dlpi_addr + elfSym[symIndex].st_value), + .AddressEnd = reinterpret_cast<const void *>(info->dlpi_addr + elfSym[symIndex].st_value + elfSym[symIndex].st_size), + .Name = symName, + }); + } + + break; + } + } + } + } + + void CollectSymbolsFromElf(dl_phdr_info* info) + { + /// MSan does not know that the program segments in memory are initialized. +#if defined(_msan_enabled_) + __msan_unpoison_string(info->dlpi_name); +#endif + + TString objectName = info->dlpi_name; + auto buildId = GetBuildId(info); + + /// If the name is empty and there is a non-empty build-id - it's main executable. + /// Find a elf file for the main executable and set the build-id. + if (objectName.empty()) { + objectName = TFsPath{"/proc/self/exe"}.ReadLink(); + } else { + TFsPath debugInfoPath = TFsPath("/usr/lib/debug") / TFsPath{objectName}.Basename(); + if (debugInfoPath.Exists()) { + objectName = debugInfoPath; + } + } + + auto range = GetExecutableRange(info); + if (!range) { + return; + } + + TObject object { + .AddressBegin = range->first, + .AddressEnd = range->second, + .Name = objectName, + .BuildId = buildId, + }; + + TElfPtr elf; + if (auto it = ObjectNameToElf_.find(objectName); it != ObjectNameToElf_.end()) { + elf = it->second; + } else { + if (TFsPath{objectName}.Exists()) { + elf = std::make_shared<TElf>(objectName); + if (elf->GetBuildId() != buildId) { + elf.reset(); + } + } + + YT_VERIFY(ObjectNameToElf_.emplace(objectName, elf).second); + } + + object.Elf = elf; + + if (elf) { + SearchAndCollectSymbolsFromELFSymbolTable(info, *elf, SHT_SYMTAB, ".strtab"); + } + + Objects_.push_back(std::move(object)); + } + + bool SearchAndCollectSymbolsFromELFSymbolTable( + dl_phdr_info* info, + const TElf& elf, + unsigned sectionHeaderType, + const char* stringTableName) + { + std::optional<TElf::TSection> symbolTable; + std::optional<TElf::TSection> stringTable; + + if (!elf.IterateSections([&] (const TElf::TSection& section, size_t) { + if (section.Header.sh_type == sectionHeaderType) { + symbolTable.emplace(section); + } else if (section.Header.sh_type == SHT_STRTAB && 0 == strcmp(section.Name(), stringTableName)) { + stringTable.emplace(section); + } + + return (symbolTable && stringTable); + })) { + return false; + } + + CollectSymbolsFromELFSymbolTable(info, elf, *symbolTable, *stringTable); + return true; + } + + void CollectSymbolsFromELFSymbolTable( + dl_phdr_info* info, + const TElf& elf, + const TElf::TSection& symbol_table, + const TElf::TSection& string_table) + { + const TElfSym* symbolTableEntry = reinterpret_cast<const TElfSym*>(symbol_table.begin()); + const TElfSym* symbolTableEnd = reinterpret_cast<const TElfSym*>(symbol_table.end()); + + const char* strings = string_table.begin(); + for (; symbolTableEntry < symbolTableEnd; ++symbolTableEntry) { + if (!symbolTableEntry->st_name || + !symbolTableEntry->st_value || + !symbolTableEntry->st_size || + strings + symbolTableEntry->st_name >= elf.end()) + { + continue; + } + + /// Find the name in strings table. + const auto* symbolName = strings + symbolTableEntry->st_name; + if (!symbolName) { + continue; + } + + Symbols_.push_back(TSymbolIndex::TSymbol{ + .AddressBegin = reinterpret_cast<const void*>(info->dlpi_addr + symbolTableEntry->st_value), + .AddressEnd = reinterpret_cast<const void *>(info->dlpi_addr + symbolTableEntry->st_value + symbolTableEntry->st_size), + .Name = symbolName, + }); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +TString GetVersion() +{ + return "0.3"; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TSymbolIndexSymbolizer +{ +public: + explicit TSymbolIndexSymbolizer(NProto::Profile* profile) + : Profile_(profile) + { } + + void Symbolize(bool filesOnly) + { + for (const auto& object : SymbolIndex_.Objects()) { + SymbolizeObject(&object); + } + + for (int index = 0; index < Profile_->location_size(); index++) { + auto location = Profile_->mutable_location(index); + void* ip = reinterpret_cast<void*>(location->address()); + + if (auto object = SymbolIndex_.FindObject(ip)) { + location->set_mapping_id(SymbolizeObject(object)); + } + } + + Profile_->add_comment(SymbolizeString("generated by ytprof " + GetVersion())); + + if (filesOnly) { + return; + } + + for (int index = 0; index < Profile_->function_size(); index++) { + auto function = Profile_->mutable_function(index); + + void* ip = reinterpret_cast<void*>(function->id()); + + if (auto symbol = SymbolIndex_.FindSymbol(ip)) { + function->set_name(SymbolizeString(CppDemangle(symbol->Name))); + function->set_system_name(SymbolizeString(symbol->Name)); + } else if (auto object = SymbolIndex_.FindObject(ip)) { + auto offset = reinterpret_cast<intptr_t>(ip) - reinterpret_cast<intptr_t>(object->AddressBegin); + auto filename = TFsPath{object->Name}.Basename(); + auto name = Sprintf("%p", reinterpret_cast<void*>(offset)) + "@" + filename; + + function->set_name(SymbolizeString(name)); + function->set_system_name(SymbolizeString(name)); + } + } + } + +private: + NProto::Profile* const Profile_; + + TSymbolIndex SymbolIndex_; + + THashMap<const TSymbolIndex::TObject*, ui64> Objects_; + THashMap<TString, ui64> Strings_; + + ui64 SymbolizeString(const TString& str) + { + auto it = Strings_.find(str); + if (it != Strings_.end()) { + return it->second; + } + + auto id = Profile_->string_table_size(); + Strings_[str] = id; + Profile_->add_string_table(str); + return id; + } + + ui64 SymbolizeObject(const TSymbolIndex::TObject* object) + { + auto it = Objects_.find(object); + if (it != Objects_.end()) { + return it->second; + } + + auto mapping = Profile_->add_mapping(); + mapping->set_id(Profile_->mapping_size()); + Objects_[object] = mapping->id(); + + mapping->set_memory_start(reinterpret_cast<ui64>(object->AddressBegin)); + mapping->set_memory_limit(reinterpret_cast<ui64>(object->AddressEnd)); + mapping->set_filename(SymbolizeString(object->Name)); + return mapping->id(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +static int OnVdsoPhdr(struct dl_phdr_info *info, size_t /* size */, void *data) +{ +#ifndef YT_NO_AUXV + auto vdso = (uintptr_t) getauxval(AT_SYSINFO_EHDR); +#else + uintptr_t vdso = 0; +#endif + auto vdsoRange = reinterpret_cast<std::pair<void*, void*>*>(data); + if (info->dlpi_addr == vdso) { + auto range = GetExecutableRange(info); + if (range) { + *vdsoRange = *range; + } + } + + return 0; +} + +std::pair<void*, void*> GetVdsoRange() +{ + std::pair<void*, void*> vdsoRange = {0, 0}; + dl_iterate_phdr(OnVdsoPhdr, &vdsoRange); + return vdsoRange; +} + +//////////////////////////////////////////////////////////////////////////////// + +static int OnBuildIdPhdr(struct dl_phdr_info *info, size_t /*size*/, void *data) +{ + auto buildId = reinterpret_cast<std::optional<TString>*>(data); + + TString objectName = info->dlpi_name; + if (objectName.empty()) { + *buildId = TSymbolIndex::GetBuildId(info); + } + + return 0; +} + +std::optional<TString> GetBuildId() +{ + std::optional<TString> buildId; + dl_iterate_phdr(OnBuildIdPhdr, &buildId); + return buildId; +} + +//////////////////////////////////////////////////////////////////////////////// + +void Symbolize(NProto::Profile* profile, bool filesOnly) +{ + TSymbolIndexSymbolizer symbolizer(profile); + symbolizer.Symbolize(filesOnly); + return; +} + +//////////////////////////////////////////////////////////////////////////////// + +void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo) +{ + auto addComment = [&] (const TString& comment) { + auto id = profile->string_table_size(); + profile->add_string_table(comment); + profile->add_comment(id); + }; + + if (!buildInfo.BinaryVersion.empty()) { + addComment("binary_version=" + buildInfo.BinaryVersion); + } + addComment("arc_revision=" + buildInfo.ArcRevision); + addComment("build_type=" + buildInfo.BuildType); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/symbolize.h b/yt/yt/library/ytprof/symbolize.h new file mode 100644 index 00000000000..6d99b56484a --- /dev/null +++ b/yt/yt/library/ytprof/symbolize.h @@ -0,0 +1,30 @@ +#pragma once + +#include <memory> +#include <optional> + +#include <util/generic/string.h> + +#include <yt/yt/library/ytprof/profile.pb.h> + +#include "build_info.h" + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +void Symbolize(NProto::Profile* profile, bool filesOnly = false); + +void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo); + +std::pair<void*, void*> GetVdsoRange(); + +// Returns current binary build id as binary string. +std::optional<TString> GetBuildId(); + +// Returns version of profiler library. +TString GetVersion(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/symbolize_other.cpp b/yt/yt/library/ytprof/symbolize_other.cpp new file mode 100644 index 00000000000..49f216fd977 --- /dev/null +++ b/yt/yt/library/ytprof/symbolize_other.cpp @@ -0,0 +1,35 @@ +#include "symbolize.h" +#include "util/system/compiler.h" + +namespace NYT::NYTProf { + +//////////////////////////////////////////////////////////////////////////////// + +void Symbolize(NProto::Profile* profile, bool filesOnly) +{ + Y_UNUSED(profile, filesOnly); +} + +std::pair<void*, void*> GetVdsoRange() +{ + return {nullptr, nullptr}; +} + +TString GetVersion() +{ + return "0.2"; +} + +void AddBuildInfo(NProto::Profile* profile, const TBuildInfo& buildInfo) +{ + Y_UNUSED(profile, buildInfo); +} + +std::optional<TString> GetBuildId() +{ + return {}; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTProf diff --git a/yt/yt/library/ytprof/ya.make b/yt/yt/library/ytprof/ya.make new file mode 100644 index 00000000000..79b92eb7153 --- /dev/null +++ b/yt/yt/library/ytprof/ya.make @@ -0,0 +1,74 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + profile.proto + signal_safe_profiler.cpp + cpu_profiler.cpp + heap_profiler.cpp + spinlock_profiler.cpp + profile.cpp + build_info.cpp + external_pprof.cpp +) + +IF (OS_LINUX) + SRCS(symbolize.cpp) +ELSE() + SRCS(symbolize_other.cpp) +ENDIF() + +PEERDIR( + library/cpp/yt/memory + library/cpp/yt/threading + library/cpp/yt/backtrace/cursors/interop + library/cpp/yt/backtrace/cursors/frame_pointer + library/cpp/yt/backtrace/cursors/libunwind + yt/yt/library/ytprof/api + contrib/libs/libunwind + contrib/libs/tcmalloc/malloc_extension + library/cpp/svnversion + yt/yt/core +) + +IF (NOT OPENSOURCE) + PEERDIR( + yt/yt/library/ytprof/bundle + ) +ENDIF() + +IF (OS_SDK == "local") + CXXFLAGS(-DYT_NO_AUXV) +ENDIF() + +IF (BUILD_TYPE == "PROFILE") + CXXFLAGS(-DYTPROF_PROFILE_BUILD) +ENDIF() + +CXXFLAGS(-DYTPROF_BUILD_TYPE='\"${BUILD_TYPE}\"') + +END() + +RECURSE( + http + example + bundle +) + +IF (OS_LINUX) + RECURSE( + integration + ) + + IF (NOT OPENSOURCE) + RECURSE( + benchmark + ) + ENDIF() + + IF (NOT SANITIZER_TYPE AND NOT YT_TEAMCITY) + RECURSE(unittests) + ENDIF() +ENDIF() + |