diff options
| author | arcadia-devtools <[email protected]> | 2022-06-22 15:26:41 +0300 | 
|---|---|---|
| committer | arcadia-devtools <[email protected]> | 2022-06-22 15:26:41 +0300 | 
| commit | 03ae68528a1fca061195bac52f0484f6f54b2582 (patch) | |
| tree | 4093239f0b89511e8ff2b29fabc76300f5ecd10a /library/cpp | |
| parent | 250d29abfdc9a2526cac1e0b4b36c5b6e1d58e0c (diff) | |
intermediate changes
ref:e5b94b91d513ee8cc2d1610107a4e0b462b9c9db
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/actors/core/events.h | 1 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 6 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 10 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/packet.h | 9 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/protos/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/protos/service.proto | 40 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/wilson_span.cpp | 7 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/wilson_trace.h | 2 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.cpp | 100 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.h | 24 | 
11 files changed, 189 insertions, 20 deletions
| diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 702cf50fadf..f4a31da31bb 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -97,6 +97,7 @@ namespace NActors {                  InvokeResult,                  CoroTimeout,                  InvokeQuery, +                Wilson,                  End,                  // Compatibility section diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 32f015af54c..f839741cb54 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -18,10 +18,8 @@ namespace NActors {              return false;          } -        auto& span = *event.Span; -        span.EndOk(); -        const NWilson::TTraceId traceId(span); -        event.Span.reset(); +        const NWilson::TTraceId traceId(event.Span); +        event.Span.EndOk();          LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());          task.Orbit.Take(event.Orbit); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e48d294420b..25e996ddcae 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -59,13 +59,11 @@ namespace NActors {              TEventHolder& event = Pool.Allocate(Queue);              const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));              OutputQueueSize += bytes; -            event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, -                NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue"); -            if (*event.Span) { -                auto& span = *event.Span; -                span +            if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, +                    NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) { +                event.Span = std::move(span                      .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) -                    .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); +                    .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)));              }              return std::make_pair(bytes, &event);          } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 3a6aadfb9f8..9161e3af710 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -117,7 +117,7 @@ struct TEventHolder : TNonCopyable {      ui32 EventSerializedSize;      ui32 EventActuallySerialized;      mutable NLWTrace::TOrbit Orbit; -    std::optional<NWilson::TSpan> Span; +    NWilson::TSpan Span;      ui32 Fill(IEventHandle& ev); @@ -136,9 +136,10 @@ struct TEventHolder : TNonCopyable {          const TActorId& r = d.Recipient;          const TActorId& s = d.Sender;          const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; +        Span.EndError("nondelivery");          auto ev = Event -            ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, NWilson::TTraceId(d.TraceId)) -            : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, NWilson::TTraceId(d.TraceId)); +            ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span) +            : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span);          NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));      } @@ -146,7 +147,7 @@ struct TEventHolder : TNonCopyable {          Event.Reset();          Buffer.Reset();          Orbit.Reset(); -        Span.reset(); +        Span = {};      }  }; diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 65b2e32d87e..75c7b16dff8 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -16,4 +16,5 @@ target_link_libraries(cpp-actors-wilson PUBLIC  )  target_sources(cpp-actors-wilson PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_uploader.cpp  ) diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt index a7cc2e94ffd..f61a5ee58d7 100644 --- a/library/cpp/actors/wilson/protos/CMakeLists.txt +++ b/library/cpp/actors/wilson/protos/CMakeLists.txt @@ -8,14 +8,19 @@  add_library(actors-wilson-protos) +set_property(TARGET actors-wilson-protos PROPERTY +  PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +)  target_link_libraries(actors-wilson-protos PUBLIC    contrib-libs-cxxsupp    yutil +  contrib-libs-grpc    contrib-libs-protobuf  )  target_proto_messages(actors-wilson-protos PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto    ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto +  ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/service.proto    ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto  )  target_proto_addincls(actors-wilson-protos @@ -31,3 +36,7 @@ target_proto_outs(actors-wilson-protos    --cpp_out=${CMAKE_BINARY_DIR}/    --cpp_styleguide_out=${CMAKE_BINARY_DIR}/  ) +target_proto_plugin(actors-wilson-protos +  grpc_cpp +  grpc_cpp +) diff --git a/library/cpp/actors/wilson/protos/service.proto b/library/cpp/actors/wilson/protos/service.proto new file mode 100644 index 00000000000..7a40af39c82 --- /dev/null +++ b/library/cpp/actors/wilson/protos/service.proto @@ -0,0 +1,40 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +//     http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.collector.trace.v1; + +import "library/cpp/actors/wilson/protos/trace.proto"; + +// Service that can be used to push spans between one Application instrumented with +// OpenTelemetry and a collector, or between a collector and a central collector (in this +// case spans are sent/received to/from multiple Applications). +service TraceService { +  // For performance reasons, it is recommended to keep this RPC +  // alive for the entire life of the application. +  rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {} +} + +message ExportTraceServiceRequest { +  // An array of ResourceSpans. +  // For data coming from a single resource this array will typically contain one +  // element. Intermediary nodes (such as OpenTelemetry Collector) that receive +  // data from multiple origins typically batch the data before forwarding further and +  // in that case this array will contain multiple elements. +  repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1; +} + +message ExportTraceServiceResponse { +} diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index 6b6ea03ccf6..a97184c3f09 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -1,4 +1,5 @@  #include "wilson_span.h" +#include "wilson_uploader.h"  #include <library/cpp/actors/core/log.h>  #include <google/protobuf/text_format.h> @@ -53,11 +54,7 @@ namespace NWilson {      void TSpan::Send() {          if (TlsActivationContext) { -            NProtoBuf::TextFormat::Printer p; -            p.SetSingleLineMode(true); -            TString s; -            p.PrintToString(Data->Span, &s); -            LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s); +            TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));          }      } diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index 9937c1f8075..9e4ea7deb4d 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -97,7 +97,7 @@ namespace NWilson {              Raw = p[2];          } -        void Serialize(TSerializedTraceId* out) const { +        void Serialize(TSerializedTraceId *out) const {              auto p = reinterpret_cast<ui64*>(*out);              p[0] = TraceId[0];              p[1] = TraceId[1]; diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp new file mode 100644 index 00000000000..e87776717bb --- /dev/null +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -0,0 +1,100 @@ +#include "wilson_uploader.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/wilson/protos/service.pb.h> +#include <library/cpp/actors/wilson/protos/service.grpc.pb.h> +#include <util/stream/file.h> +#include <grpc++/grpc++.h> +#include <chrono> + +namespace NWilson { + +    using namespace NActors; + +    namespace NServiceProto = opentelemetry::proto::collector::trace::v1; + +    namespace { + +        class TWilsonUploader +            : public TActorBootstrapped<TWilsonUploader> +        { +            TString Host; +            ui16 Port; +            TString RootCA; + +            std::shared_ptr<grpc::Channel> Channel; +            std::unique_ptr<NServiceProto::TraceService::Stub> Stub; +            grpc::CompletionQueue CQ; + +            std::unique_ptr<grpc::ClientContext> Context; +            std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader; +            NServiceProto::ExportTraceServiceRequest Request; +            NServiceProto::ExportTraceServiceResponse Response; +            grpc::Status Status; + +        public: +            TWilsonUploader(TString host, ui16 port, TString rootCA) +                : Host(std::move(host)) +                , Port(std::move(port)) +                , RootCA(std::move(rootCA)) +            {} + +            ~TWilsonUploader() { +                CQ.Shutdown(); +            } + +            void Bootstrap() { +                Become(&TThis::StateFunc); + +                Channel = grpc::CreateChannel(TStringBuilder() << Host << ":" << Port, grpc::SslCredentials({ +                    .pem_root_certs = TFileInput(RootCA).ReadAll(), +                })); +                Stub = NServiceProto::TraceService::NewStub(Channel); +            } + +            void Handle(TEvWilson::TPtr ev) { +                CheckIfDone(); + +                auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans(); +                auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans(); +                ev->Get()->Span.Swap(sspan->add_spans()); + +                if (!Context) { +                    SendRequest(); +                } +            } + +            void SendRequest() { +                Y_VERIFY(!Reader && !Context); +                Context = std::make_unique<grpc::ClientContext>(); +                Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ); +                Reader->Finish(&Response, &Status, nullptr); +            } + +            void CheckIfDone() { +                if (Context) { +                    void *tag; +                    bool ok; +                    if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { +                        Reader.reset(); +                        Context.reset(); + +                        if (Request.resource_spans_size()) { +                            SendRequest(); +                        } +                    } +                } +            } + +            STRICT_STFUNC(StateFunc, +                hFunc(TEvWilson, Handle); +            ); +        }; + +    } // anonymous + +    IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA) { +        return new TWilsonUploader(std::move(host), port, std::move(rootCA)); +    } + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_uploader.h b/library/cpp/actors/wilson/wilson_uploader.h new file mode 100644 index 00000000000..b6a65aadd72 --- /dev/null +++ b/library/cpp/actors/wilson/wilson_uploader.h @@ -0,0 +1,24 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/wilson/protos/trace.pb.h> + +namespace NWilson { + +    struct TEvWilson : NActors::TEventLocal<TEvWilson, NActors::TEvents::TSystem::Wilson> { +        opentelemetry::proto::trace::v1::Span Span; + +        TEvWilson(opentelemetry::proto::trace::v1::Span *span) { +            Span.Swap(span); +        } +    }; + +    inline NActors::TActorId MakeWilsonUploaderId() { +        return NActors::TActorId(0, TStringBuf("WilsonUpload", 12)); +    } + +    NActors::IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA); + +} // NWilson | 
