diff options
author | makostrov <makostrov@yandex-team.com> | 2023-10-06 18:55:55 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-10-06 22:29:59 +0300 |
commit | af083c863566a279ae4b04b3835b0b38dc8c3729 (patch) | |
tree | 84ad6c9ce5f99fe3679251631912a7cdfdb2b2a0 | |
parent | 67ea49de7a21d7bbb8836363af40226716c892c0 (diff) | |
download | ydb-af083c863566a279ae4b04b3835b0b38dc8c3729.tar.gz |
Add Request Context to Compute Actor
KIKIMR-19303
Теперь на каждый запрос к БД, будет заводиться свой TraceId и храниться в структуре TUserRequestContext, вместе с некоторыми дополнительными параметрами
Далее этот контекст пробрасывается в Executer-ы и Planner. B этом PR я пробросил его ещё и в Compute Actor
Далее в логах выводится этот Контекст запроса
Это нужно для дебаг, чтобы по запросу было проще найти все логи, которые относятся непосредственно к нему
25 files changed, 127 insertions, 29 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt index 99e0026d267..5a5b9167f10 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt @@ -62,6 +62,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index 3dcd30969b6..11c2296e52f 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -63,6 +63,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt index 3dcd30969b6..11c2296e52f 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt @@ -63,6 +63,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt index 99e0026d267..5a5b9167f10 100644 --- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt @@ -62,6 +62,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_user_request_context.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/kqp_user_request_context.cpp b/ydb/core/kqp/common/kqp_user_request_context.cpp new file mode 100644 index 00000000000..48dcacde95d --- /dev/null +++ b/ydb/core/kqp/common/kqp_user_request_context.cpp @@ -0,0 +1,14 @@ +#include "kqp_user_request_context.h" + +namespace NKikimr::NKqp { + + void TUserRequestContext::Out(IOutputStream& o) const { + o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}"; + } + + void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) { + resultMap["TraceId"] = ctx.TraceId; + resultMap["Database"] = ctx.Database; + resultMap["SessionId"] = ctx.SessionId; + } +} diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 3fccabda861..62bf4402fa0 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -2,6 +2,7 @@ #include <util/stream/output.h> #include <util/generic/fwd.h> +#include <contrib/libs/protobuf/src/google/protobuf/map.h> namespace NKikimr::NKqp { @@ -18,10 +19,10 @@ namespace NKikimr::NKqp { , SessionId(sessionId) {} - void Out(IOutputStream& o) const { - o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}"; - } + void Out(IOutputStream& o) const; }; + + void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap); } template<> diff --git a/ydb/core/kqp/common/ya.make b/ydb/core/kqp/common/ya.make index ffde67dfcf7..6729f12b443 100644 --- a/ydb/core/kqp/common/ya.make +++ b/ydb/core/kqp/common/ya.make @@ -17,6 +17,8 @@ SRCS( kqp_types.cpp kqp.cpp kqp.h + kqp_user_request_context.cpp + kqp_user_request_context.h ) PEERDIR( diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 35034a56d8f..3e1710cc4b7 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2206,7 +2206,7 @@ private: Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), - ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks, UserRequestContext); + ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks, GetUserRequestContext()); auto err = Planner->PlanExecution(); if (err) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 5c0473abcbb..4f03839c67e 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -53,13 +53,13 @@ LWTRACE_USING(KQP_PROVIDER); namespace NKikimr { namespace NKqp { -#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) enum class EExecType { Data, @@ -128,7 +128,6 @@ public: , Planner(nullptr) , ExecuterRetriesConfig(executerRetriesConfig) , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) - , UserRequestContext(userRequestContext) , AggregationSettings(aggregation) , HasOlapTable(false) { @@ -136,6 +135,7 @@ public: TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>(); TasksGraph.GetMeta().Database = Database; TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion; + TasksGraph.GetMeta().UserRequestContext = userRequestContext; ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc); ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, @@ -796,8 +796,8 @@ protected: if (sinkName) { auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); task.Meta.SecureParams.emplace(sinkName, structuredToken); - if (UserRequestContext->TraceId) { - task.Meta.TaskParams.emplace("fq.job_id", UserRequestContext->TraceId); + if (GetUserRequestContext()->TraceId) { + task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->TraceId); // "fq.restart_count" } } @@ -1529,6 +1529,14 @@ protected: return std::move(sb); } + const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() const { + return TasksGraph.GetMeta().UserRequestContext; + } + + TIntrusivePtr<TUserRequestContext>& MutableUserRequestContext() { + return TasksGraph.GetMeta().UserRequestContext; + } + protected: IKqpGateway::TExecPhysicalRequest Request; const TString Database; @@ -1572,8 +1580,6 @@ protected: TDuration MaximalSecretsSnapshotWaitTime; bool SubscribedOnSecrets = false; - TIntrusivePtr<TUserRequestContext> UserRequestContext; - const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot; bool HasOlapTable; diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 8774fc02443..762a5b45126 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -391,6 +391,10 @@ private: Counters->Counters->LiteralTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); } + const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() { + return UserRequestContext; + } + private: IKqpGateway::TExecPhysicalRequest Request; TKqpRequestCounters::TPtr Counters; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 7178e37ee05..e6e90937f48 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -15,10 +15,10 @@ using namespace NActors; namespace NKikimr::NKqp { -#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) -#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream) +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream) using namespace NYql; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index e19abf395ba..fc359a4d287 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -399,7 +399,7 @@ private: Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, false /* isDataQuery */, - Request.MkqlMemoryLimit, nullptr, false, UserRequestContext); + Request.MkqlMemoryLimit, nullptr, false, GetUserRequestContext()); LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size()); auto err = Planner->PlanExecution(); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index a49088a0c92..f9bcd93ad72 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -1111,6 +1111,8 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N } } + SerializeCtxToMap(*tasksGraph.GetMeta().UserRequestContext, *result->MutableRequestContext()); + FillTaskMeta(stageInfo, task, *result); } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index b0398100b49..ed3b84c77e4 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/scheme/scheme_tabledefs.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -94,6 +95,7 @@ struct TGraphMeta { TIntrusivePtr<TProtoArenaHolder> Arena; TString Database; NKikimrConfig::TTableServiceConfig::EChannelTransportVersion ChannelTransportVersion; + TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext; const TIntrusivePtr<TProtoArenaHolder>& GetArenaIntrusivePtr() const { return Arena; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 56dca8bbf48..e7f4ffbb9d3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -213,11 +213,11 @@ public: } auto selfId = SelfId(); auto as = TActivationContext::ActorSystem(); - ev->Get()->SetClientLostAction(selfId, as); + ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, Settings.TableService, Settings.QueryService, std::move(id), SessionId); - if (QueryState->UserRequestContext->TraceId == "") { + if (QueryState->UserRequestContext->TraceId.empty()) { QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString(); } } @@ -989,11 +989,9 @@ public: } if (literal) { - if (QueryState) { - request.Orbit = std::move(QueryState->Orbit); - } - request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); Y_ENSURE(QueryState); + request.Orbit = std::move(QueryState->Orbit); + request.TraceId = QueryState->KqpSessionSpan.GetTraceId(); auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId(), QueryState->UserRequestContext); ++QueryState->CurrentTx; ProcessExecuterResult(response.get()); diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt index e6abdb15992..7b0917ad05b 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt @@ -38,4 +38,5 @@ target_sources(dq-actors-compute PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp ) diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt index 3b824ab6741..0fed5ab4c00 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt @@ -39,4 +39,5 @@ target_sources(dq-actors-compute PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp ) diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt index 3b824ab6741..0fed5ab4c00 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt @@ -39,4 +39,5 @@ target_sources(dq-actors-compute PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp ) diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt index e6abdb15992..7b0917ad05b 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt @@ -38,4 +38,5 @@ target_sources(dq-actors-compute PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_request_context.cpp ) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 612dc5066c1..acca39dc554 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -22,6 +22,7 @@ #include <ydb/library/yql/minikql/mkql_program_builder.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/dq/actors/dq.h> +#include <ydb/library/yql/dq/actors/compute/dq_request_context.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/wilson/wilson_span.h> @@ -30,6 +31,7 @@ #include <util/string/join.h> #include <util/system/hostname.h> + #include <any> #include <queue> @@ -129,8 +131,14 @@ protected: public: void Bootstrap() { try { - LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". "; - + { + TStringBuilder prefixBuilder; + prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". "; + if (RequestContext) { + prefixBuilder << "Ctx: " << *RequestContext << ". "; + } + LogPrefix = prefixBuilder; + } CA_LOG_D("Start compute actor " << this->SelfId() << ", task: " << Task.GetId()); Channels = new TDqComputeActorChannels(this->SelfId(), TxId, Task, !RuntimeSettings.FailOnUndelivery, @@ -1904,6 +1912,8 @@ private: if (OutputChannelSize) { OutputChannelSize->Add(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize); } + + RequestContext = MakeIntrusive<NYql::NDq::TRequestContext>(Task.GetRequestContext()); } void InitializeWatermarks() { @@ -2187,6 +2197,7 @@ protected: THashMap<ui64, TAsyncOutputTransformInfo> OutputTransformsMap; // Output index -> Transforms info bool ResumeEventScheduled = false; NDqProto::EComputeState State; + TIntrusivePtr<NYql::NDq::TRequestContext> RequestContext; struct TBasicStats { TDuration CpuTime; diff --git a/ydb/library/yql/dq/actors/compute/dq_request_context.cpp b/ydb/library/yql/dq/actors/compute/dq_request_context.cpp new file mode 100644 index 00000000000..d2024afecc5 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_request_context.cpp @@ -0,0 +1,18 @@ +#include "dq_request_context.h" + +namespace NYql::NDq { + + TRequestContext::TRequestContext(const google::protobuf::Map<TString, TString>& map) { + for (auto& [key, value] : map) { + Map.insert({key, value}); + } + } + + void TRequestContext::Out(IOutputStream& o) const { + o << "{"; + for (const auto& [key, value] : Map) { + o << " " << key << " : " << value << ". "; + } + o << "}"; + } +} diff --git a/ydb/library/yql/dq/actors/compute/dq_request_context.h b/ydb/library/yql/dq/actors/compute/dq_request_context.h new file mode 100644 index 00000000000..f93da1a9f34 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_request_context.h @@ -0,0 +1,25 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/fwd.h> +#include <contrib/libs/protobuf/src/google/protobuf/map.h> +#include <util/generic/hash.h> + +namespace NYql::NDq { + + struct TRequestContext : public TAtomicRefCount<TRequestContext> { + THashMap<TString, TString> Map; + + TRequestContext() = default; + TRequestContext(const THashMap<TString, TString>& map) : Map(map) {}; + + explicit TRequestContext(const google::protobuf::Map<TProtoStringType, TProtoStringType>& map); + + void Out(IOutputStream& o) const; + }; +} + +template<> +inline void Out<NYql::NDq::TRequestContext>(IOutputStream& o, const NYql::NDq::TRequestContext &x) { + return x.Out(o); +} diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make index 34e9e08bfb6..486920dadfd 100644 --- a/ydb/library/yql/dq/actors/compute/ya.make +++ b/ydb/library/yql/dq/actors/compute/ya.make @@ -11,6 +11,8 @@ SRCS( dq_compute_actor.cpp dq_compute_issues_buffer.cpp retry_queue.cpp + dq_request_context.h + dq_request_context.cpp ) PEERDIR( diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 90698008313..43252358d95 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -185,4 +185,5 @@ message TDqTask { optional uint32 MetaId = 15; optional bool UseLlvm = 16; repeated bytes ReadRanges = 17; + map<string, string> RequestContext = 18; } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 83bbca81b2a..324aa514554 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -404,6 +404,10 @@ public: return Arena; } + const google::protobuf::Map<TProtoStringType, TProtoStringType>& GetRequestContext() const { + return Task_->GetRequestContext(); + } + private: // external callback to retrieve parameter value. |