diff options
author | hor911 <hor911@ydb.tech> | 2023-09-20 21:45:28 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-09-20 22:03:47 +0300 |
commit | 5ea8b2e87ab48a0cfeccd1856e4c342d8bf1f68d (patch) | |
tree | 58e3816799c3b343732581060ce47c3a29263b75 | |
parent | bcf08f410f67d673ba69f09767cf07c2d88b7f1c (diff) | |
download | ydb-5ea8b2e87ab48a0cfeccd1856e4c342d8bf1f68d.tar.gz |
Move UserRequestContext to Session
-rw-r--r-- | ydb/core/kqp/common/compilation/events.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_user_request_context.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 |
13 files changed, 63 insertions, 42 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index 6af53925d0..f06eb09e4f 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -6,6 +6,7 @@ #include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/common/simple/kqp_event_ids.h> #include <ydb/core/kqp/common/simple/query_id.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/counters/kqp_counters.h> namespace NKikimr::NKqp::NPrivateEvents { @@ -13,7 +14,8 @@ namespace NKikimr::NKqp::NPrivateEvents { struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> { TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline, - TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, + const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) @@ -21,6 +23,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo , KeepInCache(keepInCache) , Deadline(deadline) , DbCounters(dbCounters) + , UserRequestContext(userRequestContext) , Orbit(std::move(orbit)) , TempTablesState(std::move(tempTablesState)) , IntrestedInResult(std::move(intrestedInResult)) @@ -37,6 +40,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo TKqpDbCountersPtr DbCounters; TMaybe<bool> DocumentApiRestricted; + TIntrusivePtr<TUserRequestContext> UserRequestContext; NLWTrace::TOrbit Orbit; TKqpTempTablesState::TConstPtr TempTablesState; @@ -46,13 +50,15 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> { TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query, TInstant deadline, - TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, + const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) , Query(query) , Deadline(deadline) , DbCounters(dbCounters) + , UserRequestContext(userRequestContext) , Orbit(std::move(orbit)) , TempTablesState(std::move(tempTablesState)) , IntrestedInResult(std::move(intrestedInResult)) @@ -66,6 +72,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: TInstant Deadline; TKqpDbCountersPtr DbCounters; + TIntrusivePtr<TUserRequestContext> UserRequestContext; NLWTrace::TOrbit Orbit; TKqpTempTablesState::TConstPtr TempTablesState; diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 131900625d..3cf4559ab6 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -1,3 +1,5 @@ +#pragma once + #include <util/stream/output.h> #include <util/generic/fwd.h> diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 2ba469bba7..a9a8344019 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -63,8 +63,8 @@ public: TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& queryId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpDbCountersPtr dbCounters, - std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + const TIntrusivePtr<TUserRequestContext>& userRequestContext, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) : Owner(owner) , ModuleResolverState(moduleResolverState) @@ -78,6 +78,7 @@ public: , Config(MakeIntrusive<TKikimrConfiguration>()) , MetadataProviderConfig(metadataProviderConfig) , CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs())) + , UserRequestContext(userRequestContext) , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") , TempTablesState(std::move(tempTablesState)) { @@ -111,7 +112,7 @@ public: TimeoutTimerActorId = CreateLongTimer(ctx, CompilationTimeout, new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup())); - TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, ""); + TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, UserRequestContext->TraceId); TKqpRequestCounters::TPtr counters = new TKqpRequestCounters; counters->Counters = Counters; @@ -303,7 +304,7 @@ private: void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) { Y_ENSURE(!ev->Get()->QueryId); - TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, ""); + TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, UserRequestContext->TraceId); if (!ev->Get()->Finished) { NCpuTime::TCpuTimer timer(CompileCpuTime); @@ -392,6 +393,7 @@ private: std::shared_ptr<TKqpCompileResult> KqpCompileResult; std::optional<TString> ReplayMessage; + TIntrusivePtr<TUserRequestContext> UserRequestContext; NWilson::TSpan CompileActorSpan; TKqpTempTablesState::TConstPtr TempTablesState; @@ -427,12 +429,13 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, - TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) + TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) { return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, moduleResolverState, counters, uid, query, userToken, dbCounters, - federatedQuerySetup, + federatedQuerySetup, userRequestContext, std::move(traceId), std::move(tempTablesState)); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index fac856ee2c..4f70f8e8b4 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -179,6 +179,7 @@ struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult, + const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, TKqpTempTablesState::TConstPtr tempTablesState = {}) : Sender(sender) @@ -188,6 +189,7 @@ struct TKqpCompileRequest { , UserToken(userToken) , Deadline(deadline) , DbCounters(dbCounters) + , UserRequestContext(userRequestContext) , Orbit(std::move(orbit)) , CompileServiceSpan(std::move(span)) , Cookie(cookie) @@ -204,6 +206,7 @@ struct TKqpCompileRequest { TKqpDbCountersPtr DbCounters; TActorId CompileActor; + TIntrusivePtr<TUserRequestContext> UserRequestContext; NLWTrace::TOrbit Orbit; NWilson::TSpan CompileServiceSpan; ui64 Cookie; @@ -479,7 +482,8 @@ private: << ", sender: " << ev->Sender << ", queryUid: " << (request.Uid ? *request.Uid : "<empty>") << ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\"" - << ", keepInCache: " << request.KeepInCache); + << ", keepInCache: " << request.KeepInCache + << *request.UserRequestContext); *Counters->CompileQueryCacheSize = QueryCache.Size(); *Counters->CompileQueryCacheBytes = QueryCache.Bytes(); @@ -554,7 +558,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), request.KeepInCache, request.UserToken, request.Deadline, dbCounters, - ev->Cookie, std::move(ev->Get()->IntrestedInResult), + ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -605,6 +609,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, true, request.UserToken, request.Deadline, dbCounters, ev->Cookie, std::move(ev->Get()->IntrestedInResult), + ev->Get()->UserRequestContext, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); @@ -762,8 +767,8 @@ private: void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) { auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, ModuleResolverState, Counters, - request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.CompileServiceSpan.GetTraceId(), - std::move(request.TempTablesState)); + request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext, + request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState)); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index ff5ac1f218..9a536a9f68 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -25,7 +25,8 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, - TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId = {}, + TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + NWilson::TTraceId traceId = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 2af84d8654..d5dd1fb68e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -128,8 +128,8 @@ public: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId, TWilsonKqp::DataExecuter, "DataExecuter") + const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter") , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) { @@ -2418,10 +2418,10 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId) + const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId); + std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 6d1775a7ab..67a94ed6e6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -2,6 +2,7 @@ #include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/common/kqp_event_ids.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -91,7 +92,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 6a378b82ff..7f4d45f06c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -82,12 +82,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -103,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId); + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 0819b06921..9211a8399f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -22,8 +22,8 @@ #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/grpc_services/local_rate_limiter.h> -#include <ydb/core/util/ulid.h> #include <ydb/services/metadata/secret/fetcher.h> #include <ydb/services/metadata/secret/snapshot.h> @@ -117,7 +117,8 @@ public: TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId, ui64 spanVerbosity = 0, TString spanName = "no_name") + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) , Database(database) , UserToken(userToken) @@ -126,6 +127,7 @@ public: , Planner(nullptr) , ExecuterRetriesConfig(executerRetriesConfig) , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) + , UserRequestContext(userRequestContext) { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>(); @@ -135,7 +137,6 @@ public: ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); - UserRequestContext = MakeIntrusive<TUserRequestContext>(TUserRequestContext(UlidGen.Next().ToString(), Database, sessionId)); // here is no SessionId } void Bootstrap() { @@ -1299,7 +1300,6 @@ protected: TDuration MaximalSecretsSnapshotWaitTime; bool SubscribedOnSecrets = false; - TULIDGenerator UlidGen; TIntrusivePtr<TUserRequestContext> UserRequestContext; private: @@ -1312,14 +1312,14 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index a8a0ba7336..4286719a7e 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -62,8 +62,8 @@ public: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId, TWilsonKqp::ScanExecuter, "ScanExecuter") + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter") , PreparedQuery(preparedQuery) , AggregationSettings(aggregation) { @@ -703,10 +703,10 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) { return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, - preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId); + preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 8bc47fa74f..073f2ea4aa 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -161,7 +161,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(s } return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid, - std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); + std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState); } std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) { @@ -197,7 +197,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque } return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, - CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); + CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState); } void TKqpQueryState::AddOffsetsToTransaction() { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 36c61a0758..771dd3d1aa 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -12,6 +12,7 @@ #include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/core/kqp/common/kqp_timeouts.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/session_actor/kqp_tx.h> #include <util/generic/noncopyable.h> @@ -32,7 +33,7 @@ public: TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - NWilson::TTraceId&& traceId) + NWilson::TTraceId&& traceId, const TString& sessionId) : QueryId(queryId) , Database(database) , Cluster(cluster) @@ -41,7 +42,6 @@ public: , ProxyRequestId(ev->Cookie) , ParametersSize(ev->Get()->GetParametersSize()) , RequestActorId(ev->Get()->GetRequestActorId()) - , TraceId(ev->Get()->GetTraceId()) , IsDocumentApiRestricted_(IsDocumentApiRestricted(ev->Get()->GetRequestType())) , StartTime(TInstant::Now()) , KeepSession(ev->Get()->GetKeepSession() || longSession) @@ -61,6 +61,8 @@ public: KqpSessionSpan = NWilson::TSpan( TWilsonKqp::KqpSession, std::move(traceId), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); + + UserRequestContext = MakeIntrusive<TUserRequestContext>(ev->Get()->GetTraceId(), Database, sessionId); } // the monotonously growing counter, the ordinal number of the query, @@ -84,7 +86,7 @@ public: TActorId RequestActorId; ui64 CurrentTx = 0; - TString TraceId; + TIntrusivePtr<TUserRequestContext> UserRequestContext; bool IsDocumentApiRestricted_ = false; TInstant StartTime; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c4c29b41d4..089dbdf9a6 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -198,7 +198,7 @@ public: << "ActorId: " << SelfId() << ", " << "ActorState: " << CurrentStateFuncName() << ", "; if (Y_LIKELY(QueryState)) { - result << "TraceId: " << QueryState->TraceId << ", "; + result << "TraceId: " << QueryState->UserRequestContext->TraceId << ", "; } return result; } @@ -216,7 +216,7 @@ public: ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, - Settings.TableService, Settings.QueryService, std::move(id)); + Settings.TableService, Settings.QueryService, std::move(id), SessionId); } void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -1070,7 +1070,7 @@ public: QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()), - SessionId); + QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId)); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1318,7 +1318,7 @@ public: stats->MutableCompilation()->Swap(&QueryState->CompileStats); } - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId); YQL_ENSURE(QueryState); if (IsExecuteAction(QueryState->GetAction())) { auto ru = NRuCalc::CalcRequestUnit(*stats); |