aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-01-04 19:45:49 +0100
committerGitHub <noreply@github.com>2024-01-04 19:45:49 +0100
commitcc6cf5ce9a4267b3eacb411407625a0731acb887 (patch)
treeea53500f0b6d74412fab557ef2df28ae2229ca98
parent30e973621a71ea7226bce396b036b546802d6425 (diff)
downloadydb-cc6cf5ce9a4267b3eacb411407625a0731acb887.tar.gz
trace(kqp): add tracing ro read actors (#841)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp23
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp71
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.h5
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_trace.cpp64
-rw-r--r--ydb/library/wilson_ids/wilson.h6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
9 files changed, 141 insertions, 41 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index a3058abc1c..407974c586 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -349,7 +349,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit);
auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
- AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
+ AppData()->FunctionRegistry, settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
if (optimizeProtoForLocalExecution) {
TVector<google::protobuf::Message*>& taskSourceSettings = static_cast<TKqpComputeActor*>(computeActor)->MutableTaskSourceSettings();
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 2d2eca1845..9d1815092f 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -17,6 +17,7 @@
#include <library/cpp/threading/hot_swap/hot_swap.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/wilson_ids/wilson.h>
#include <util/generic/intrlist.h>
@@ -399,6 +400,7 @@ public:
, Counters(counters)
, UseFollowers(false)
, PipeCacheId(MainPipeCacheId)
+ , ReadActorSpan(TWilsonKqp::ReadActor, NWilson::TTraceId(args.TraceId), "ReadActor")
{
Y_ABORT_UNLESS(Arena);
Y_ABORT_UNLESS(settings->GetArena() == Arena->Get());
@@ -569,6 +571,9 @@ public:
ResolveShards[ResolveShardId] = state;
ResolveShardId += 1;
+ ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
+ "WaitForShardsResolve", NWilson::EFlags::AUTO_END);
+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
@@ -617,9 +622,13 @@ public:
}
}
+ ReadActorStateSpan.EndError(error);
+
return RuntimeError(error, statusCode);
}
+ ReadActorStateSpan.EndOk();
+
auto keyDesc = std::move(request->ResultSet[0].KeyDescription);
if (keyDesc->GetPartitions().size() == 1) {
@@ -896,10 +905,8 @@ public:
Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);
- NWilson::TTraceId traceId; // TODO: get traceId from kqp.
-
Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
- IEventHandle::FlagTrackDelivery, 0, std::move(traceId));
+ IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId());
if (!FirstShardStarted) {
state->IsFirst = true;
@@ -1385,6 +1392,8 @@ public:
}
}
TBase::PassAway();
+
+ ReadActorSpan.End();
}
void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
@@ -1395,6 +1404,11 @@ public:
NYql::TIssues issues;
issues.AddIssue(std::move(issue));
+
+ if (ReadActorSpan) {
+ ReadActorSpan.EndError(issues.ToOneLineString());
+ }
+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}
@@ -1491,6 +1505,9 @@ private:
size_t TotalRetries = 0;
bool FirstShardStarted = false;
+
+ NWilson::TSpan ReadActorSpan;
+ NWilson::TSpan ReadActorStateSpan;
};
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 6834a048c4..b51d0d3b7f 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -1,19 +1,20 @@
#include "kqp_stream_lookup_actor.h"
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
+#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/core/kqp/common/kqp_event_ids.h>
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
-#include <ydb/core/kqp/runtime/kqp_scan_data.h>
-#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
+#include <ydb/library/wilson_ids/wilson.h>
namespace NKikimr {
namespace NKqp {
@@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10;
class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
public:
- TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
- const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
- const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ TKqpStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TIntrusivePtr<TKqpCounters> counters)
- : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
- , InputIndex(inputIndex)
- , Input(input)
- , ComputeActorId(computeActorId)
- , TypeEnv(typeEnv)
- , Alloc(alloc)
+ : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << args.InputIndex << ", CA Id " << args.ComputeActorId)
+ , InputIndex(args.InputIndex)
+ , Input(args.TransformInput)
+ , ComputeActorId(args.ComputeActorId)
+ , TypeEnv(args.TypeEnv)
+ , Alloc(args.Alloc)
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
- , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc))
+ , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
+ , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
{
- IngressStats.Level = statsLevel;
+ IngressStats.Level = args.StatsLevel;
}
virtual ~TKqpStreamLookupActor() {
@@ -174,6 +173,8 @@ private:
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
+
+ LookupActorSpan.End();
}
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
@@ -234,10 +235,15 @@ private:
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
- return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: "
- << StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR);
+ TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
+ << StreamLookupWorker->GetTablePath();
+ LookupActorStateSpan.EndError(errorMsg);
+
+ return RuntimeError(errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR);
}
+ LookupActorStateSpan.EndOk();
+
auto& resultSet = ev->Get()->Request->ResultSet;
YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)");
Partitioning = resultSet[0].KeyDescription->Partitioning;
@@ -342,8 +348,11 @@ private:
<< " was resolved: " << !!Partitioning);
if (!Partitioning) {
- RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
- << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
+ TString errorMsg = TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
+ << " (request timeout exceeded)";
+ LookupActorStateSpan.EndError(errorMsg);
+
+ RuntimeError(errorMsg, NYql::NDqProto::StatusIds::TIMEOUT);
}
}
@@ -392,7 +401,7 @@ private:
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true),
- IEventHandle::FlagTrackDelivery);
+ IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId());
read.State = EReadState::Running;
@@ -438,6 +447,9 @@ private:
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
Counters->IteratorsShardResolve->Inc();
+ LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
+ "WaitForShardsResolve", NWilson::EFlags::AUTO_END);
+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
@@ -467,6 +479,11 @@ private:
NYql::TIssues issues;
issues.AddIssue(std::move(issue));
+
+ if (LookupActorSpan) {
+ LookupActorSpan.EndError(issues.ToOneLineString());
+ }
+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}
@@ -495,17 +512,15 @@ private:
ui64 ReadBytesCount = 0;
TIntrusivePtr<TKqpCounters> Counters;
+ NWilson::TSpan LookupActorSpan;
+ NWilson::TSpan LookupActorStateSpan;
};
} // namespace
-std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
- NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
- auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
- alloc, inputDesc, std::move(settings), counters);
+ auto actor = new TKqpStreamLookupActor(std::move(args), std::move(settings), counters);
return {actor, actor};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
index 7e8e266dbc..cb1c1ba8c2 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
@@ -7,10 +7,7 @@
namespace NKikimr {
namespace NKqp {
-std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
- NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);
} // namespace NKqp
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
index ddcd068a20..d244303fd9 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
@@ -7,8 +7,7 @@ namespace NKqp {
void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
- return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv,
- args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters);
+ return CreateStreamLookupActor(std::move(args), std::move(settings), counters);
});
}
diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp
index 8438c1db04..2b377de086 100644
--- a/ydb/core/tx/datashard/datashard_ut_trace.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp
@@ -364,7 +364,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
}
std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
- ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
+ ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
@@ -377,6 +377,68 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}
+ Y_UNIT_TEST(TestTraceDistributedSelectViaReadActors) {
+ auto [runtime, server, sender] = TestCreateServer();
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
+
+ FakeWilsonUploader* uploader = new FakeWilsonUploader();
+ TActorId uploaderId = runtime.Register(uploader, 0);
+ runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
+ runtime.SimulateSleep(TDuration::Seconds(10));
+
+ SplitTable(runtime, server, 5);
+
+ ExecSQL(
+ server,
+ sender,
+ "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
+ true,
+ Ydb::StatusIds::SUCCESS
+ );
+
+ ExecSQL(
+ server,
+ sender,
+ "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
+ true,
+ Ydb::StatusIds::SUCCESS
+ );
+
+ NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
+
+ ExecSQL(
+ server,
+ sender,
+ "SELECT * FROM `/Root/table-1`;",
+ true,
+ Ydb::StatusIds::SUCCESS,
+ std::move(traceId)
+ );
+
+ uploader->BuildTraceTrees();
+
+ UNIT_ASSERT_EQUAL(1, uploader->Traces.size());
+
+ FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second;
+
+ auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
+ UNIT_ASSERT(readActorSpan);
+
+ auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
+ UNIT_ASSERT_EQUAL(dsReads.size(), 2);
+
+ std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
+ "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
+ "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
+ "(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
+ "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
+ "(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
+ "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
+ "[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
+ UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
+ }
+
Y_UNIT_TEST(TestTraceWriteImmediateOnShard) {
auto [runtime, server, sender] = TestCreateServer();
diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h
index ba73508a43..32cfce0699 100644
--- a/ydb/library/wilson_ids/wilson.h
+++ b/ydb/library/wilson_ids/wilson.h
@@ -36,6 +36,12 @@ namespace NKikimr {
ProposeTransaction = 9,
ComputeActor = 9,
+
+ ReadActor = 9,
+ ReadActorShardsResolve = 10,
+
+ LookupActor = 9,
+ LookupActorShardsResolve = 10,
};
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index f1fd8dddf1..cb3ebc1e68 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -217,6 +217,7 @@ public:
IMemoryQuotaManager::TPtr MemoryQuotaManager;
const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally
TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings
+ NWilson::TTraceId TraceId;
};
struct TSinkArguments {
@@ -247,6 +248,7 @@ public:
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ NWilson::TTraceId TraceId;
};
struct TOutputTransformArguments {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index afea078bf1..9d477d4072 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1593,7 +1593,8 @@ protected:
.Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
- .Arena = Task.GetArena()
+ .Arena = Task.GetArena(),
+ .TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
@@ -1623,7 +1624,8 @@ protected:
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.ProgramBuilder = *transform.ProgramBuilder,
- .Alloc = TaskRunner->GetAllocatorPtr()
+ .Alloc = TaskRunner->GetAllocatorPtr(),
+ .TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();