diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-01-04 19:45:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-04 19:45:49 +0100 |
commit | cc6cf5ce9a4267b3eacb411407625a0731acb887 (patch) | |
tree | ea53500f0b6d74412fab557ef2df28ae2229ca98 | |
parent | 30e973621a71ea7226bce396b036b546802d6425 (diff) | |
download | ydb-cc6cf5ce9a4267b3eacb411407625a0731acb887.tar.gz |
trace(kqp): add tracing ro read actors (#841)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 71 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_trace.cpp | 64 | ||||
-rw-r--r-- | ydb/library/wilson_ids/wilson.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 6 |
9 files changed, 141 insertions, 41 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index a3058abc1c..407974c586 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -349,7 +349,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit); auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, - AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); + AppData()->FunctionRegistry, settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); if (optimizeProtoForLocalExecution) { TVector<google::protobuf::Message*>& taskSourceSettings = static_cast<TKqpComputeActor*>(computeActor)->MutableTaskSourceSettings(); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2d2eca1845..9d1815092f 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -17,6 +17,7 @@ #include <library/cpp/threading/hot_swap/hot_swap.h> #include <ydb/library/actors/core/interconnect.h> #include <ydb/library/actors/core/actorsystem.h> +#include <ydb/library/wilson_ids/wilson.h> #include <util/generic/intrlist.h> @@ -399,6 +400,7 @@ public: , Counters(counters) , UseFollowers(false) , PipeCacheId(MainPipeCacheId) + , ReadActorSpan(TWilsonKqp::ReadActor, NWilson::TTraceId(args.TraceId), "ReadActor") { Y_ABORT_UNLESS(Arena); Y_ABORT_UNLESS(settings->GetArena() == Arena->Get()); @@ -569,6 +571,9 @@ public: ResolveShards[ResolveShardId] = state; ResolveShardId += 1; + ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(), + "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } @@ -617,9 +622,13 @@ public: } } + ReadActorStateSpan.EndError(error); + return RuntimeError(error, statusCode); } + ReadActorStateSpan.EndOk(); + auto keyDesc = std::move(request->ResultSet[0].KeyDescription); if (keyDesc->GetPartitions().size() == 1) { @@ -896,10 +905,8 @@ public: Counters->CreatedIterators->Inc(); ReadIdByTabletId[state->TabletId].push_back(id); - NWilson::TTraceId traceId; // TODO: get traceId from kqp. - Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), - IEventHandle::FlagTrackDelivery, 0, std::move(traceId)); + IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId()); if (!FirstShardStarted) { state->IsFirst = true; @@ -1385,6 +1392,8 @@ public: } } TBase::PassAway(); + + ReadActorSpan.End(); } void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) { @@ -1395,6 +1404,11 @@ public: NYql::TIssues issues; issues.AddIssue(std::move(issue)); + + if (ReadActorSpan) { + ReadActorSpan.EndError(issues.ToOneLineString()); + } + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); } @@ -1491,6 +1505,9 @@ private: size_t TotalRetries = 0; bool FirstShardStarted = false; + + NWilson::TSpan ReadActorSpan; + NWilson::TSpan ReadActorStateSpan; }; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 6834a048c4..b51d0d3b7f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -1,19 +1,20 @@ #include "kqp_stream_lookup_actor.h" -#include <ydb/library/actors/core/actor_bootstrapped.h> - #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/common/kqp_event_ids.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> +#include <ydb/core/kqp/runtime/kqp_scan_data.h> +#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h> #include <ydb/core/protos/kqp_stats.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include <ydb/core/kqp/common/kqp_event_ids.h> + +#include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> -#include <ydb/core/kqp/runtime/kqp_scan_data.h> -#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> +#include <ydb/library/wilson_ids/wilson.h> namespace NKikimr { namespace NKqp { @@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10; class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput { public: - TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, - const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, - const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings, + TKqpStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) - : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId) - , InputIndex(inputIndex) - , Input(input) - , ComputeActorId(computeActorId) - , TypeEnv(typeEnv) - , Alloc(alloc) + : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << args.InputIndex << ", CA Id " << args.ComputeActorId) + , InputIndex(args.InputIndex) + , Input(args.TransformInput) + , ComputeActorId(args.ComputeActorId) + , TypeEnv(args.TypeEnv) + , Alloc(args.Alloc) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) - , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc)) + , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) , Counters(counters) + , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor") { - IngressStats.Level = statsLevel; + IngressStats.Level = args.StatsLevel; } virtual ~TKqpStreamLookupActor() { @@ -174,6 +173,8 @@ private: Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); TActorBootstrapped<TKqpStreamLookupActor>::PassAway(); + + LookupActorSpan.End(); } i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { @@ -234,10 +235,15 @@ private: void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath()); if (ev->Get()->Request->ErrorCount > 0) { - return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " - << StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR); + TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " + << StreamLookupWorker->GetTablePath(); + LookupActorStateSpan.EndError(errorMsg); + + return RuntimeError(errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR); } + LookupActorStateSpan.EndOk(); + auto& resultSet = ev->Get()->Request->ResultSet; YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)"); Partitioning = resultSet[0].KeyDescription->Partitioning; @@ -342,8 +348,11 @@ private: << " was resolved: " << !!Partitioning); if (!Partitioning) { - RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath() - << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); + TString errorMsg = TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath() + << " (request timeout exceeded)"; + LookupActorStateSpan.EndError(errorMsg); + + RuntimeError(errorMsg, NYql::NDqProto::StatusIds::TIMEOUT); } } @@ -392,7 +401,7 @@ private: record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true), - IEventHandle::FlagTrackDelivery); + IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId()); read.State = EReadState::Running; @@ -438,6 +447,9 @@ private: keyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); Counters->IteratorsShardResolve->Inc(); + LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), + "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); @@ -467,6 +479,11 @@ private: NYql::TIssues issues; issues.AddIssue(std::move(issue)); + + if (LookupActorSpan) { + LookupActorSpan.EndError(issues.ToOneLineString()); + } + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); } @@ -495,17 +512,15 @@ private: ui64 ReadBytesCount = 0; TIntrusivePtr<TKqpCounters> Counters; + NWilson::TSpan LookupActorSpan; + NWilson::TSpan LookupActorStateSpan; }; } // namespace -std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, - NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc, +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) { - auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory, - alloc, inputDesc, std::move(settings), counters); + auto actor = new TKqpStreamLookupActor(std::move(args), std::move(settings), counters); return {actor, actor}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h index 7e8e266dbc..cb1c1ba8c2 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -7,10 +7,7 @@ namespace NKikimr { namespace NKqp { -std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, - NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc, +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>); } // namespace NKqp diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp index ddcd068a20..d244303fd9 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -7,8 +7,7 @@ namespace NKqp { void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) { factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { - return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv, - args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters); + return CreateStreamLookupActor(std::move(args), std::move(settings), counters); }); } diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 8438c1db04..2b377de086 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -364,7 +364,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { } std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , " + ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " @@ -377,6 +377,68 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } + Y_UNIT_TEST(TestTraceDistributedSelectViaReadActors) { + auto [runtime, server, sender] = TestCreateServer(); + + CreateShardedTable(server, sender, "/Root", "table-1", 1, false); + + FakeWilsonUploader* uploader = new FakeWilsonUploader(); + TActorId uploaderId = runtime.Register(uploader, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.SimulateSleep(TDuration::Seconds(10)); + + SplitTable(runtime, server, 5); + + ExecSQL( + server, + sender, + "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);", + true, + Ydb::StatusIds::SUCCESS + ); + + ExecSQL( + server, + sender, + "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);", + true, + Ydb::StatusIds::SUCCESS + ); + + NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095); + + ExecSQL( + server, + sender, + "SELECT * FROM `/Root/table-1`;", + true, + Ydb::StatusIds::SUCCESS, + std::move(traceId) + ); + + uploader->BuildTraceTrees(); + + UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); + + FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second; + + auto readActorSpan = trace.Root.BFSFindOne("ReadActor"); + UNIT_ASSERT(readActorSpan); + + auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard. + UNIT_ASSERT_EQUAL(dsReads.size(), 2); + + std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " + "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , " + "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " + "(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " + "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , " + "(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " + "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " + "[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])"; + UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); + } + Y_UNIT_TEST(TestTraceWriteImmediateOnShard) { auto [runtime, server, sender] = TestCreateServer(); diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h index ba73508a43..32cfce0699 100644 --- a/ydb/library/wilson_ids/wilson.h +++ b/ydb/library/wilson_ids/wilson.h @@ -36,6 +36,12 @@ namespace NKikimr { ProposeTransaction = 9, ComputeActor = 9, + + ReadActor = 9, + ReadActorShardsResolve = 10, + + LookupActor = 9, + LookupActorShardsResolve = 10, }; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index f1fd8dddf1..cb3ebc1e68 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -217,6 +217,7 @@ public: IMemoryQuotaManager::TPtr MemoryQuotaManager; const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings + NWilson::TTraceId TraceId; }; struct TSinkArguments { @@ -247,6 +248,7 @@ public: const NKikimr::NMiniKQL::THolderFactory& HolderFactory; NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + NWilson::TTraceId TraceId; }; struct TOutputTransformArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index afea078bf1..9d477d4072 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1593,7 +1593,8 @@ protected: .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr, .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager, .SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr), - .Arena = Task.GetArena() + .Arena = Task.GetArena(), + .TraceId = ComputeActorSpan.GetTraceId() }); } catch (const std::exception& ex) { throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); @@ -1623,7 +1624,8 @@ protected: .TypeEnv = typeEnv, .HolderFactory = holderFactory, .ProgramBuilder = *transform.ProgramBuilder, - .Alloc = TaskRunner->GetAllocatorPtr() + .Alloc = TaskRunner->GetAllocatorPtr(), + .TraceId = ComputeActorSpan.GetTraceId() }); } catch (const std::exception& ex) { throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); |