aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-10-06 18:55:55 +0300
committermakostrov <makostrov@yandex-team.com>2023-10-06 22:29:59 +0300
commitaf083c863566a279ae4b04b3835b0b38dc8c3729 (patch)
tree84ad6c9ce5f99fe3679251631912a7cdfdb2b2a0
parent67ea49de7a21d7bbb8836363af40226716c892c0 (diff)
downloadydb-af083c863566a279ae4b04b3835b0b38dc8c3729.tar.gz
Add Request Context to Compute Actor
KIKIMR-19303 Теперь на каждый запрос к БД, будет заводиться свой TraceId и храниться в структуре TUserRequestContext, вместе с некоторыми дополнительными параметрами Далее этот контекст пробрасывается в Executer-ы и Planner. B этом PR я пробросил его ещё и в Compute Actor Далее в логах выводится этот Контекст запроса Это нужно для дебаг, чтобы по запросу было проще найти все логи, которые относятся непосредственно к нему
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.cpp14
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.h7
-rw-r--r--ydb/core/kqp/common/ya.make2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h30
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h15
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_request_context.cpp18
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_request_context.h25
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make2
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h4
25 files changed, 127 insertions, 29 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
index 99e0026d267..5a5b9167f10 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
@@ -62,6 +62,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index 3dcd30969b6..11c2296e52f 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -63,6 +63,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
index 3dcd30969b6..11c2296e52f 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
@@ -63,6 +63,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
index 99e0026d267..5a5b9167f10 100644
--- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
@@ -62,6 +62,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/kqp_user_request_context.cpp b/ydb/core/kqp/common/kqp_user_request_context.cpp
new file mode 100644
index 00000000000..48dcacde95d
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_user_request_context.cpp
@@ -0,0 +1,14 @@
+#include "kqp_user_request_context.h"
+
+namespace NKikimr::NKqp {
+
+ void TUserRequestContext::Out(IOutputStream& o) const {
+ o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}";
+ }
+
+ void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
+ resultMap["TraceId"] = ctx.TraceId;
+ resultMap["Database"] = ctx.Database;
+ resultMap["SessionId"] = ctx.SessionId;
+ }
+}
diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h
index 3fccabda861..62bf4402fa0 100644
--- a/ydb/core/kqp/common/kqp_user_request_context.h
+++ b/ydb/core/kqp/common/kqp_user_request_context.h
@@ -2,6 +2,7 @@
#include <util/stream/output.h>
#include <util/generic/fwd.h>
+#include <contrib/libs/protobuf/src/google/protobuf/map.h>
namespace NKikimr::NKqp {
@@ -18,10 +19,10 @@ namespace NKikimr::NKqp {
, SessionId(sessionId) {}
- void Out(IOutputStream& o) const {
- o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}";
- }
+ void Out(IOutputStream& o) const;
};
+
+ void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap);
}
template<>
diff --git a/ydb/core/kqp/common/ya.make b/ydb/core/kqp/common/ya.make
index ffde67dfcf7..6729f12b443 100644
--- a/ydb/core/kqp/common/ya.make
+++ b/ydb/core/kqp/common/ya.make
@@ -17,6 +17,8 @@ SRCS(
kqp_types.cpp
kqp.cpp
kqp.h
+ kqp_user_request_context.cpp
+ kqp_user_request_context.h
)
PEERDIR(
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 35034a56d8f..3e1710cc4b7 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2206,7 +2206,7 @@ private:
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
- ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks, UserRequestContext);
+ ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks, GetUserRequestContext());
auto err = Planner->PlanExecution();
if (err) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 5c0473abcbb..4f03839c67e 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -53,13 +53,13 @@ LWTRACE_USING(KQP_PROVIDER);
namespace NKikimr {
namespace NKqp {
-#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
enum class EExecType {
Data,
@@ -128,7 +128,6 @@ public:
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
- , UserRequestContext(userRequestContext)
, AggregationSettings(aggregation)
, HasOlapTable(false)
{
@@ -136,6 +135,7 @@ public:
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
+ TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
@@ -796,8 +796,8 @@ protected:
if (sinkName) {
auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
task.Meta.SecureParams.emplace(sinkName, structuredToken);
- if (UserRequestContext->TraceId) {
- task.Meta.TaskParams.emplace("fq.job_id", UserRequestContext->TraceId);
+ if (GetUserRequestContext()->TraceId) {
+ task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->TraceId);
// "fq.restart_count"
}
}
@@ -1529,6 +1529,14 @@ protected:
return std::move(sb);
}
+ const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() const {
+ return TasksGraph.GetMeta().UserRequestContext;
+ }
+
+ TIntrusivePtr<TUserRequestContext>& MutableUserRequestContext() {
+ return TasksGraph.GetMeta().UserRequestContext;
+ }
+
protected:
IKqpGateway::TExecPhysicalRequest Request;
const TString Database;
@@ -1572,8 +1580,6 @@ protected:
TDuration MaximalSecretsSnapshotWaitTime;
bool SubscribedOnSecrets = false;
- TIntrusivePtr<TUserRequestContext> UserRequestContext;
-
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
bool HasOlapTable;
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 8774fc02443..762a5b45126 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -391,6 +391,10 @@ private:
Counters->Counters->LiteralTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
}
+ const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() {
+ return UserRequestContext;
+ }
+
private:
IKqpGateway::TExecPhysicalRequest Request;
TKqpRequestCounters::TPtr Counters;
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 7178e37ee05..e6e90937f48 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -15,10 +15,10 @@ using namespace NActors;
namespace NKikimr::NKqp {
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
using namespace NYql;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index e19abf395ba..fc359a4d287 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -399,7 +399,7 @@ private:
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, false /* isDataQuery */,
- Request.MkqlMemoryLimit, nullptr, false, UserRequestContext);
+ Request.MkqlMemoryLimit, nullptr, false, GetUserRequestContext());
LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
auto err = Planner->PlanExecution();
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index a49088a0c92..f9bcd93ad72 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -1111,6 +1111,8 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N
}
}
+ SerializeCtxToMap(*tasksGraph.GetMeta().UserRequestContext, *result->MutableRequestContext());
+
FillTaskMeta(stageInfo, task, *result);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index b0398100b49..ed3b84c77e4 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -94,6 +95,7 @@ struct TGraphMeta {
TIntrusivePtr<TProtoArenaHolder> Arena;
TString Database;
NKikimrConfig::TTableServiceConfig::EChannelTransportVersion ChannelTransportVersion;
+ TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
const TIntrusivePtr<TProtoArenaHolder>& GetArenaIntrusivePtr() const {
return Arena;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 56dca8bbf48..e7f4ffbb9d3 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -213,11 +213,11 @@ public:
}
auto selfId = SelfId();
auto as = TActivationContext::ActorSystem();
- ev->Get()->SetClientLostAction(selfId, as);
+ ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
- if (QueryState->UserRequestContext->TraceId == "") {
+ if (QueryState->UserRequestContext->TraceId.empty()) {
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
}
}
@@ -989,11 +989,9 @@ public:
}
if (literal) {
- if (QueryState) {
- request.Orbit = std::move(QueryState->Orbit);
- }
- request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
Y_ENSURE(QueryState);
+ request.Orbit = std::move(QueryState->Orbit);
+ request.TraceId = QueryState->KqpSessionSpan.GetTraceId();
auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId(), QueryState->UserRequestContext);
++QueryState->CurrentTx;
ProcessExecuterResult(response.get());
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
index e6abdb15992..7b0917ad05b 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
@@ -38,4 +38,5 @@ target_sources(dq-actors-compute PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
)
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
index 3b824ab6741..0fed5ab4c00 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
@@ -39,4 +39,5 @@ target_sources(dq-actors-compute PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
)
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
index 3b824ab6741..0fed5ab4c00 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
@@ -39,4 +39,5 @@ target_sources(dq-actors-compute PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
)
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
index e6abdb15992..7b0917ad05b 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
@@ -38,4 +38,5 @@ target_sources(dq-actors-compute PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
)
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 612dc5066c1..acca39dc554 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -22,6 +22,7 @@
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/dq/actors/dq.h>
+#include <ydb/library/yql/dq/actors/compute/dq_request_context.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/wilson/wilson_span.h>
@@ -30,6 +31,7 @@
#include <util/string/join.h>
#include <util/system/hostname.h>
+
#include <any>
#include <queue>
@@ -129,8 +131,14 @@ protected:
public:
void Bootstrap() {
try {
- LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
-
+ {
+ TStringBuilder prefixBuilder;
+ prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
+ if (RequestContext) {
+ prefixBuilder << "Ctx: " << *RequestContext << ". ";
+ }
+ LogPrefix = prefixBuilder;
+ }
CA_LOG_D("Start compute actor " << this->SelfId() << ", task: " << Task.GetId());
Channels = new TDqComputeActorChannels(this->SelfId(), TxId, Task, !RuntimeSettings.FailOnUndelivery,
@@ -1904,6 +1912,8 @@ private:
if (OutputChannelSize) {
OutputChannelSize->Add(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
}
+
+ RequestContext = MakeIntrusive<NYql::NDq::TRequestContext>(Task.GetRequestContext());
}
void InitializeWatermarks() {
@@ -2187,6 +2197,7 @@ protected:
THashMap<ui64, TAsyncOutputTransformInfo> OutputTransformsMap; // Output index -> Transforms info
bool ResumeEventScheduled = false;
NDqProto::EComputeState State;
+ TIntrusivePtr<NYql::NDq::TRequestContext> RequestContext;
struct TBasicStats {
TDuration CpuTime;
diff --git a/ydb/library/yql/dq/actors/compute/dq_request_context.cpp b/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
new file mode 100644
index 00000000000..d2024afecc5
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_request_context.cpp
@@ -0,0 +1,18 @@
+#include "dq_request_context.h"
+
+namespace NYql::NDq {
+
+ TRequestContext::TRequestContext(const google::protobuf::Map<TString, TString>& map) {
+ for (auto& [key, value] : map) {
+ Map.insert({key, value});
+ }
+ }
+
+ void TRequestContext::Out(IOutputStream& o) const {
+ o << "{";
+ for (const auto& [key, value] : Map) {
+ o << " " << key << " : " << value << ". ";
+ }
+ o << "}";
+ }
+}
diff --git a/ydb/library/yql/dq/actors/compute/dq_request_context.h b/ydb/library/yql/dq/actors/compute/dq_request_context.h
new file mode 100644
index 00000000000..f93da1a9f34
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_request_context.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/fwd.h>
+#include <contrib/libs/protobuf/src/google/protobuf/map.h>
+#include <util/generic/hash.h>
+
+namespace NYql::NDq {
+
+ struct TRequestContext : public TAtomicRefCount<TRequestContext> {
+ THashMap<TString, TString> Map;
+
+ TRequestContext() = default;
+ TRequestContext(const THashMap<TString, TString>& map) : Map(map) {};
+
+ explicit TRequestContext(const google::protobuf::Map<TProtoStringType, TProtoStringType>& map);
+
+ void Out(IOutputStream& o) const;
+ };
+}
+
+template<>
+inline void Out<NYql::NDq::TRequestContext>(IOutputStream& o, const NYql::NDq::TRequestContext &x) {
+ return x.Out(o);
+}
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index 34e9e08bfb6..486920dadfd 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -11,6 +11,8 @@ SRCS(
dq_compute_actor.cpp
dq_compute_issues_buffer.cpp
retry_queue.cpp
+ dq_request_context.h
+ dq_request_context.cpp
)
PEERDIR(
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 90698008313..43252358d95 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -185,4 +185,5 @@ message TDqTask {
optional uint32 MetaId = 15;
optional bool UseLlvm = 16;
repeated bytes ReadRanges = 17;
+ map<string, string> RequestContext = 18;
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 83bbca81b2a..324aa514554 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -404,6 +404,10 @@ public:
return Arena;
}
+ const google::protobuf::Map<TProtoStringType, TProtoStringType>& GetRequestContext() const {
+ return Task_->GetRequestContext();
+ }
+
private:
// external callback to retrieve parameter value.