diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-22 15:26:41 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-22 15:26:41 +0300 |
commit | 03ae68528a1fca061195bac52f0484f6f54b2582 (patch) | |
tree | 4093239f0b89511e8ff2b29fabc76300f5ecd10a /library | |
parent | 250d29abfdc9a2526cac1e0b4b36c5b6e1d58e0c (diff) | |
download | ydb-03ae68528a1fca061195bac52f0484f6f54b2582.tar.gz |
intermediate changes
ref:e5b94b91d513ee8cc2d1610107a4e0b462b9c9db
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/events.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 10 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/packet.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/wilson/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/wilson/protos/CMakeLists.txt | 9 | ||||
-rw-r--r-- | library/cpp/actors/wilson/protos/service.proto | 40 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_span.cpp | 7 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_trace.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.cpp | 100 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.h | 24 | ||||
-rw-r--r-- | library/python/testing/yatest_common/yatest/common/runtime.py | 6 |
12 files changed, 193 insertions, 22 deletions
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 702cf50fadf..f4a31da31bb 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -97,6 +97,7 @@ namespace NActors { InvokeResult, CoroTimeout, InvokeQuery, + Wilson, End, // Compatibility section diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 32f015af54c..f839741cb54 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -18,10 +18,8 @@ namespace NActors { return false; } - auto& span = *event.Span; - span.EndOk(); - const NWilson::TTraceId traceId(span); - event.Span.reset(); + const NWilson::TTraceId traceId(event.Span); + event.Span.EndOk(); LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); task.Orbit.Take(event.Orbit); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e48d294420b..25e996ddcae 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -59,13 +59,11 @@ namespace NActors { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, - NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue"); - if (*event.Span) { - auto& span = *event.Span; - span + if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, + NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) { + event.Span = std::move(span .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) - .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize))); } return std::make_pair(bytes, &event); } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 3a6aadfb9f8..9161e3af710 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -117,7 +117,7 @@ struct TEventHolder : TNonCopyable { ui32 EventSerializedSize; ui32 EventActuallySerialized; mutable NLWTrace::TOrbit Orbit; - std::optional<NWilson::TSpan> Span; + NWilson::TSpan Span; ui32 Fill(IEventHandle& ev); @@ -136,9 +136,10 @@ struct TEventHolder : TNonCopyable { const TActorId& r = d.Recipient; const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; + Span.EndError("nondelivery"); auto ev = Event - ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, NWilson::TTraceId(d.TraceId)) - : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, NWilson::TTraceId(d.TraceId)); + ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span) + : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span); NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure)); } @@ -146,7 +147,7 @@ struct TEventHolder : TNonCopyable { Event.Reset(); Buffer.Reset(); Orbit.Reset(); - Span.reset(); + Span = {}; } }; diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 65b2e32d87e..75c7b16dff8 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -16,4 +16,5 @@ target_link_libraries(cpp-actors-wilson PUBLIC ) target_sources(cpp-actors-wilson PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_uploader.cpp ) diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt index a7cc2e94ffd..f61a5ee58d7 100644 --- a/library/cpp/actors/wilson/protos/CMakeLists.txt +++ b/library/cpp/actors/wilson/protos/CMakeLists.txt @@ -8,14 +8,19 @@ add_library(actors-wilson-protos) +set_property(TARGET actors-wilson-protos PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) target_link_libraries(actors-wilson-protos PUBLIC contrib-libs-cxxsupp yutil + contrib-libs-grpc contrib-libs-protobuf ) target_proto_messages(actors-wilson-protos PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/service.proto ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto ) target_proto_addincls(actors-wilson-protos @@ -31,3 +36,7 @@ target_proto_outs(actors-wilson-protos --cpp_out=${CMAKE_BINARY_DIR}/ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ ) +target_proto_plugin(actors-wilson-protos + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/actors/wilson/protos/service.proto b/library/cpp/actors/wilson/protos/service.proto new file mode 100644 index 00000000000..7a40af39c82 --- /dev/null +++ b/library/cpp/actors/wilson/protos/service.proto @@ -0,0 +1,40 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.collector.trace.v1; + +import "library/cpp/actors/wilson/protos/trace.proto"; + +// Service that can be used to push spans between one Application instrumented with +// OpenTelemetry and a collector, or between a collector and a central collector (in this +// case spans are sent/received to/from multiple Applications). +service TraceService { + // For performance reasons, it is recommended to keep this RPC + // alive for the entire life of the application. + rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {} +} + +message ExportTraceServiceRequest { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain one + // element. Intermediary nodes (such as OpenTelemetry Collector) that receive + // data from multiple origins typically batch the data before forwarding further and + // in that case this array will contain multiple elements. + repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1; +} + +message ExportTraceServiceResponse { +} diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index 6b6ea03ccf6..a97184c3f09 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -1,4 +1,5 @@ #include "wilson_span.h" +#include "wilson_uploader.h" #include <library/cpp/actors/core/log.h> #include <google/protobuf/text_format.h> @@ -53,11 +54,7 @@ namespace NWilson { void TSpan::Send() { if (TlsActivationContext) { - NProtoBuf::TextFormat::Printer p; - p.SetSingleLineMode(true); - TString s; - p.PrintToString(Data->Span, &s); - LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s); + TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); } } diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index 9937c1f8075..9e4ea7deb4d 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -97,7 +97,7 @@ namespace NWilson { Raw = p[2]; } - void Serialize(TSerializedTraceId* out) const { + void Serialize(TSerializedTraceId *out) const { auto p = reinterpret_cast<ui64*>(*out); p[0] = TraceId[0]; p[1] = TraceId[1]; diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp new file mode 100644 index 00000000000..e87776717bb --- /dev/null +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -0,0 +1,100 @@ +#include "wilson_uploader.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/wilson/protos/service.pb.h> +#include <library/cpp/actors/wilson/protos/service.grpc.pb.h> +#include <util/stream/file.h> +#include <grpc++/grpc++.h> +#include <chrono> + +namespace NWilson { + + using namespace NActors; + + namespace NServiceProto = opentelemetry::proto::collector::trace::v1; + + namespace { + + class TWilsonUploader + : public TActorBootstrapped<TWilsonUploader> + { + TString Host; + ui16 Port; + TString RootCA; + + std::shared_ptr<grpc::Channel> Channel; + std::unique_ptr<NServiceProto::TraceService::Stub> Stub; + grpc::CompletionQueue CQ; + + std::unique_ptr<grpc::ClientContext> Context; + std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader; + NServiceProto::ExportTraceServiceRequest Request; + NServiceProto::ExportTraceServiceResponse Response; + grpc::Status Status; + + public: + TWilsonUploader(TString host, ui16 port, TString rootCA) + : Host(std::move(host)) + , Port(std::move(port)) + , RootCA(std::move(rootCA)) + {} + + ~TWilsonUploader() { + CQ.Shutdown(); + } + + void Bootstrap() { + Become(&TThis::StateFunc); + + Channel = grpc::CreateChannel(TStringBuilder() << Host << ":" << Port, grpc::SslCredentials({ + .pem_root_certs = TFileInput(RootCA).ReadAll(), + })); + Stub = NServiceProto::TraceService::NewStub(Channel); + } + + void Handle(TEvWilson::TPtr ev) { + CheckIfDone(); + + auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans(); + auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans(); + ev->Get()->Span.Swap(sspan->add_spans()); + + if (!Context) { + SendRequest(); + } + } + + void SendRequest() { + Y_VERIFY(!Reader && !Context); + Context = std::make_unique<grpc::ClientContext>(); + Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ); + Reader->Finish(&Response, &Status, nullptr); + } + + void CheckIfDone() { + if (Context) { + void *tag; + bool ok; + if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { + Reader.reset(); + Context.reset(); + + if (Request.resource_spans_size()) { + SendRequest(); + } + } + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvWilson, Handle); + ); + }; + + } // anonymous + + IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA) { + return new TWilsonUploader(std::move(host), port, std::move(rootCA)); + } + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_uploader.h b/library/cpp/actors/wilson/wilson_uploader.h new file mode 100644 index 00000000000..b6a65aadd72 --- /dev/null +++ b/library/cpp/actors/wilson/wilson_uploader.h @@ -0,0 +1,24 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/wilson/protos/trace.pb.h> + +namespace NWilson { + + struct TEvWilson : NActors::TEventLocal<TEvWilson, NActors::TEvents::TSystem::Wilson> { + opentelemetry::proto::trace::v1::Span Span; + + TEvWilson(opentelemetry::proto::trace::v1::Span *span) { + Span.Swap(span); + } + }; + + inline NActors::TActorId MakeWilsonUploaderId() { + return NActors::TActorId(0, TStringBuf("WilsonUpload", 12)); + } + + NActors::IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA); + +} // NWilson diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py index d5aba763158..43e5c5b0f62 100644 --- a/library/python/testing/yatest_common/yatest/common/runtime.py +++ b/library/python/testing/yatest_common/yatest/common/runtime.py @@ -131,7 +131,8 @@ def ram_drive_path(path=None): """ if 'YA_TEST_RAM_DRIVE_PATH' in os.environ: return _join_path(os.environ['YA_TEST_RAM_DRIVE_PATH'], path) - return get_param("ram_drive_path") + elif get_param("ram_drive_path"): + return _join_path(get_param("ram_drive_path"), path) def output_ram_drive_path(path=None): @@ -142,7 +143,8 @@ def output_ram_drive_path(path=None): """ if 'YA_TEST_OUTPUT_RAM_DRIVE_PATH' in os.environ: return _join_path(os.environ['YA_TEST_OUTPUT_RAM_DRIVE_PATH'], path) - return _get_ya_plugin_instance().get_context("test_output_ram_drive_path") + elif _get_ya_plugin_instance().get_context("test_output_ram_drive_path"): + return _join_path(_get_ya_plugin_instance().get_context("test_output_ram_drive_path"), path) def binary_path(path=None): |