aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-22 15:26:41 +0300
committerarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-22 15:26:41 +0300
commit03ae68528a1fca061195bac52f0484f6f54b2582 (patch)
tree4093239f0b89511e8ff2b29fabc76300f5ecd10a /library
parent250d29abfdc9a2526cac1e0b4b36c5b6e1d58e0c (diff)
downloadydb-03ae68528a1fca061195bac52f0484f6f54b2582.tar.gz
intermediate changes
ref:e5b94b91d513ee8cc2d1610107a4e0b462b9c9db
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/events.h1
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h10
-rw-r--r--library/cpp/actors/interconnect/packet.h9
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt1
-rw-r--r--library/cpp/actors/wilson/protos/CMakeLists.txt9
-rw-r--r--library/cpp/actors/wilson/protos/service.proto40
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp7
-rw-r--r--library/cpp/actors/wilson/wilson_trace.h2
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp100
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.h24
-rw-r--r--library/python/testing/yatest_common/yatest/common/runtime.py6
12 files changed, 193 insertions, 22 deletions
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 702cf50fadf..f4a31da31bb 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -97,6 +97,7 @@ namespace NActors {
InvokeResult,
CoroTimeout,
InvokeQuery,
+ Wilson,
End,
// Compatibility section
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 32f015af54c..f839741cb54 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -18,10 +18,8 @@ namespace NActors {
return false;
}
- auto& span = *event.Span;
- span.EndOk();
- const NWilson::TTraceId traceId(span);
- event.Span.reset();
+ const NWilson::TTraceId traceId(event.Span);
+ event.Span.EndOk();
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
task.Orbit.Take(event.Orbit);
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e48d294420b..25e996ddcae 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -59,13 +59,11 @@ namespace NActors {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
- NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue");
- if (*event.Span) {
- auto& span = *event.Span;
- span
+ if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
+ NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) {
+ event.Span = std::move(span
.Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
- .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)));
}
return std::make_pair(bytes, &event);
}
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 3a6aadfb9f8..9161e3af710 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -117,7 +117,7 @@ struct TEventHolder : TNonCopyable {
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
mutable NLWTrace::TOrbit Orbit;
- std::optional<NWilson::TSpan> Span;
+ NWilson::TSpan Span;
ui32 Fill(IEventHandle& ev);
@@ -136,9 +136,10 @@ struct TEventHolder : TNonCopyable {
const TActorId& r = d.Recipient;
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
+ Span.EndError("nondelivery");
auto ev = Event
- ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, NWilson::TTraceId(d.TraceId))
- : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, NWilson::TTraceId(d.TraceId));
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span)
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span);
NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
}
@@ -146,7 +147,7 @@ struct TEventHolder : TNonCopyable {
Event.Reset();
Buffer.Reset();
Orbit.Reset();
- Span.reset();
+ Span = {};
}
};
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 65b2e32d87e..75c7b16dff8 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -16,4 +16,5 @@ target_link_libraries(cpp-actors-wilson PUBLIC
)
target_sources(cpp-actors-wilson PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_uploader.cpp
)
diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt
index a7cc2e94ffd..f61a5ee58d7 100644
--- a/library/cpp/actors/wilson/protos/CMakeLists.txt
+++ b/library/cpp/actors/wilson/protos/CMakeLists.txt
@@ -8,14 +8,19 @@
add_library(actors-wilson-protos)
+set_property(TARGET actors-wilson-protos PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
target_link_libraries(actors-wilson-protos PUBLIC
contrib-libs-cxxsupp
yutil
+ contrib-libs-grpc
contrib-libs-protobuf
)
target_proto_messages(actors-wilson-protos PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/service.proto
${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto
)
target_proto_addincls(actors-wilson-protos
@@ -31,3 +36,7 @@ target_proto_outs(actors-wilson-protos
--cpp_out=${CMAKE_BINARY_DIR}/
--cpp_styleguide_out=${CMAKE_BINARY_DIR}/
)
+target_proto_plugin(actors-wilson-protos
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/actors/wilson/protos/service.proto b/library/cpp/actors/wilson/protos/service.proto
new file mode 100644
index 00000000000..7a40af39c82
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/service.proto
@@ -0,0 +1,40 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.collector.trace.v1;
+
+import "library/cpp/actors/wilson/protos/trace.proto";
+
+// Service that can be used to push spans between one Application instrumented with
+// OpenTelemetry and a collector, or between a collector and a central collector (in this
+// case spans are sent/received to/from multiple Applications).
+service TraceService {
+ // For performance reasons, it is recommended to keep this RPC
+ // alive for the entire life of the application.
+ rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {}
+}
+
+message ExportTraceServiceRequest {
+ // An array of ResourceSpans.
+ // For data coming from a single resource this array will typically contain one
+ // element. Intermediary nodes (such as OpenTelemetry Collector) that receive
+ // data from multiple origins typically batch the data before forwarding further and
+ // in that case this array will contain multiple elements.
+ repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1;
+}
+
+message ExportTraceServiceResponse {
+}
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index 6b6ea03ccf6..a97184c3f09 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -1,4 +1,5 @@
#include "wilson_span.h"
+#include "wilson_uploader.h"
#include <library/cpp/actors/core/log.h>
#include <google/protobuf/text_format.h>
@@ -53,11 +54,7 @@ namespace NWilson {
void TSpan::Send() {
if (TlsActivationContext) {
- NProtoBuf::TextFormat::Printer p;
- p.SetSingleLineMode(true);
- TString s;
- p.PrintToString(Data->Span, &s);
- LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s);
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
}
}
diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h
index 9937c1f8075..9e4ea7deb4d 100644
--- a/library/cpp/actors/wilson/wilson_trace.h
+++ b/library/cpp/actors/wilson/wilson_trace.h
@@ -97,7 +97,7 @@ namespace NWilson {
Raw = p[2];
}
- void Serialize(TSerializedTraceId* out) const {
+ void Serialize(TSerializedTraceId *out) const {
auto p = reinterpret_cast<ui64*>(*out);
p[0] = TraceId[0];
p[1] = TraceId[1];
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
new file mode 100644
index 00000000000..e87776717bb
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -0,0 +1,100 @@
+#include "wilson_uploader.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/wilson/protos/service.pb.h>
+#include <library/cpp/actors/wilson/protos/service.grpc.pb.h>
+#include <util/stream/file.h>
+#include <grpc++/grpc++.h>
+#include <chrono>
+
+namespace NWilson {
+
+ using namespace NActors;
+
+ namespace NServiceProto = opentelemetry::proto::collector::trace::v1;
+
+ namespace {
+
+ class TWilsonUploader
+ : public TActorBootstrapped<TWilsonUploader>
+ {
+ TString Host;
+ ui16 Port;
+ TString RootCA;
+
+ std::shared_ptr<grpc::Channel> Channel;
+ std::unique_ptr<NServiceProto::TraceService::Stub> Stub;
+ grpc::CompletionQueue CQ;
+
+ std::unique_ptr<grpc::ClientContext> Context;
+ std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
+ NServiceProto::ExportTraceServiceRequest Request;
+ NServiceProto::ExportTraceServiceResponse Response;
+ grpc::Status Status;
+
+ public:
+ TWilsonUploader(TString host, ui16 port, TString rootCA)
+ : Host(std::move(host))
+ , Port(std::move(port))
+ , RootCA(std::move(rootCA))
+ {}
+
+ ~TWilsonUploader() {
+ CQ.Shutdown();
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateFunc);
+
+ Channel = grpc::CreateChannel(TStringBuilder() << Host << ":" << Port, grpc::SslCredentials({
+ .pem_root_certs = TFileInput(RootCA).ReadAll(),
+ }));
+ Stub = NServiceProto::TraceService::NewStub(Channel);
+ }
+
+ void Handle(TEvWilson::TPtr ev) {
+ CheckIfDone();
+
+ auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans();
+ auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans();
+ ev->Get()->Span.Swap(sspan->add_spans());
+
+ if (!Context) {
+ SendRequest();
+ }
+ }
+
+ void SendRequest() {
+ Y_VERIFY(!Reader && !Context);
+ Context = std::make_unique<grpc::ClientContext>();
+ Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ);
+ Reader->Finish(&Response, &Status, nullptr);
+ }
+
+ void CheckIfDone() {
+ if (Context) {
+ void *tag;
+ bool ok;
+ if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
+ Reader.reset();
+ Context.reset();
+
+ if (Request.resource_spans_size()) {
+ SendRequest();
+ }
+ }
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvWilson, Handle);
+ );
+ };
+
+ } // anonymous
+
+ IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA) {
+ return new TWilsonUploader(std::move(host), port, std::move(rootCA));
+ }
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_uploader.h b/library/cpp/actors/wilson/wilson_uploader.h
new file mode 100644
index 00000000000..b6a65aadd72
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_uploader.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/wilson/protos/trace.pb.h>
+
+namespace NWilson {
+
+ struct TEvWilson : NActors::TEventLocal<TEvWilson, NActors::TEvents::TSystem::Wilson> {
+ opentelemetry::proto::trace::v1::Span Span;
+
+ TEvWilson(opentelemetry::proto::trace::v1::Span *span) {
+ Span.Swap(span);
+ }
+ };
+
+ inline NActors::TActorId MakeWilsonUploaderId() {
+ return NActors::TActorId(0, TStringBuf("WilsonUpload", 12));
+ }
+
+ NActors::IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA);
+
+} // NWilson
diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py
index d5aba763158..43e5c5b0f62 100644
--- a/library/python/testing/yatest_common/yatest/common/runtime.py
+++ b/library/python/testing/yatest_common/yatest/common/runtime.py
@@ -131,7 +131,8 @@ def ram_drive_path(path=None):
"""
if 'YA_TEST_RAM_DRIVE_PATH' in os.environ:
return _join_path(os.environ['YA_TEST_RAM_DRIVE_PATH'], path)
- return get_param("ram_drive_path")
+ elif get_param("ram_drive_path"):
+ return _join_path(get_param("ram_drive_path"), path)
def output_ram_drive_path(path=None):
@@ -142,7 +143,8 @@ def output_ram_drive_path(path=None):
"""
if 'YA_TEST_OUTPUT_RAM_DRIVE_PATH' in os.environ:
return _join_path(os.environ['YA_TEST_OUTPUT_RAM_DRIVE_PATH'], path)
- return _get_ya_plugin_instance().get_context("test_output_ram_drive_path")
+ elif _get_ya_plugin_instance().get_context("test_output_ram_drive_path"):
+ return _join_path(_get_ya_plugin_instance().get_context("test_output_ram_drive_path"), path)
def binary_path(path=None):