aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-09-20 21:45:28 +0300
committerhor911 <hor911@ydb.tech>2023-09-20 22:03:47 +0300
commit5ea8b2e87ab48a0cfeccd1856e4c342d8bf1f68d (patch)
tree58e3816799c3b343732581060ce47c3a29263b75
parentbcf08f410f67d673ba69f09767cf07c2d88b7f1c (diff)
downloadydb-5ea8b2e87ab48a0cfeccd1856e4c342d8bf1f68d.tar.gz
Move UserRequestContext to Session
-rw-r--r--ydb/core/kqp/common/compilation/events.h11
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.h2
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp15
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp13
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp4
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h8
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
13 files changed, 63 insertions, 42 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h
index 6af53925d0..f06eb09e4f 100644
--- a/ydb/core/kqp/common/compilation/events.h
+++ b/ydb/core/kqp/common/compilation/events.h
@@ -6,6 +6,7 @@
#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/kqp/common/simple/query_id.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
namespace NKikimr::NKqp::NPrivateEvents {
@@ -13,7 +14,8 @@ namespace NKikimr::NKqp::NPrivateEvents {
struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline,
- TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
+ TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
+ const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
@@ -21,6 +23,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
, KeepInCache(keepInCache)
, Deadline(deadline)
, DbCounters(dbCounters)
+ , UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
@@ -37,6 +40,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TKqpDbCountersPtr DbCounters;
TMaybe<bool> DocumentApiRestricted;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
NLWTrace::TOrbit Orbit;
TKqpTempTablesState::TConstPtr TempTablesState;
@@ -46,13 +50,15 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid,
const TMaybe<TKqpQueryId>& query, TInstant deadline,
- TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
+ TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult,
+ const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
, Query(query)
, Deadline(deadline)
, DbCounters(dbCounters)
+ , UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
@@ -66,6 +72,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
NLWTrace::TOrbit Orbit;
TKqpTempTablesState::TConstPtr TempTablesState;
diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h
index 131900625d..3cf4559ab6 100644
--- a/ydb/core/kqp/common/kqp_user_request_context.h
+++ b/ydb/core/kqp/common/kqp_user_request_context.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <util/stream/output.h>
#include <util/generic/fwd.h>
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 2ba469bba7..a9a8344019 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -63,8 +63,8 @@ public:
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& queryId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpDbCountersPtr dbCounters,
- std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
: Owner(owner)
, ModuleResolverState(moduleResolverState)
@@ -78,6 +78,7 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, MetadataProviderConfig(metadataProviderConfig)
, CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs()))
+ , UserRequestContext(userRequestContext)
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
{
@@ -111,7 +112,7 @@ public:
TimeoutTimerActorId = CreateLongTimer(ctx, CompilationTimeout, new IEventHandle(SelfId(), SelfId(),
new TEvents::TEvWakeup()));
- TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, "");
+ TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, UserRequestContext->TraceId);
TKqpRequestCounters::TPtr counters = new TKqpRequestCounters;
counters->Counters = Counters;
@@ -303,7 +304,7 @@ private:
void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
Y_ENSURE(!ev->Get()->QueryId);
- TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, "");
+ TYqlLogScope logScope(ctx, NKikimrServices::KQP_YQL, YqlName, UserRequestContext->TraceId);
if (!ev->Get()->Finished) {
NCpuTime::TCpuTimer timer(CompileCpuTime);
@@ -392,6 +393,7 @@ private:
std::shared_ptr<TKqpCompileResult> KqpCompileResult;
std::optional<TString> ReplayMessage;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
NWilson::TSpan CompileActorSpan;
TKqpTempTablesState::TConstPtr TempTablesState;
@@ -427,12 +429,13 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
- TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
+ TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
{
return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig,
moduleResolverState, counters,
uid, query, userToken, dbCounters,
- federatedQuerySetup,
+ federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState));
}
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index fac856ee2c..4f70f8e8b4 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -179,6 +179,7 @@ struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult,
+ const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
TKqpTempTablesState::TConstPtr tempTablesState = {})
: Sender(sender)
@@ -188,6 +189,7 @@ struct TKqpCompileRequest {
, UserToken(userToken)
, Deadline(deadline)
, DbCounters(dbCounters)
+ , UserRequestContext(userRequestContext)
, Orbit(std::move(orbit))
, CompileServiceSpan(std::move(span))
, Cookie(cookie)
@@ -204,6 +206,7 @@ struct TKqpCompileRequest {
TKqpDbCountersPtr DbCounters;
TActorId CompileActor;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
NLWTrace::TOrbit Orbit;
NWilson::TSpan CompileServiceSpan;
ui64 Cookie;
@@ -479,7 +482,8 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
<< ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\""
- << ", keepInCache: " << request.KeepInCache);
+ << ", keepInCache: " << request.KeepInCache
+ << *request.UserRequestContext);
*Counters->CompileQueryCacheSize = QueryCache.Size();
*Counters->CompileQueryCacheBytes = QueryCache.Bytes();
@@ -554,7 +558,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
- ev->Cookie, std::move(ev->Get()->IntrestedInResult),
+ ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext,
std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -605,6 +609,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
true, request.UserToken, request.Deadline, dbCounters,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
+ ev->Get()->UserRequestContext,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));
@@ -762,8 +767,8 @@ private:
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, ModuleResolverState, Counters,
- request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.CompileServiceSpan.GetTraceId(),
- std::move(request.TempTablesState));
+ request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext,
+ request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState));
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index ff5ac1f218..9a536a9f68 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -25,7 +25,8 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
const TString& uid, const TKqpQueryId& query,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
- TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId = {},
+ TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ NWilson::TTraceId traceId = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr);
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 2af84d8654..d5dd1fb68e 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -128,8 +128,8 @@ public:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId, TWilsonKqp::DataExecuter, "DataExecuter")
+ const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter")
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
@@ -2418,10 +2418,10 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId)
+ const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
- std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId);
+ std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 6d1775a7ab..67a94ed6e6 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -2,6 +2,7 @@
#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -91,7 +92,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId);
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 6a378b82ff..7f4d45f06c 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -82,12 +82,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId)
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -103,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
- return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId);
+ return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, sessionId);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 0819b06921..9211a8399f 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -22,8 +22,8 @@
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/grpc_services/local_rate_limiter.h>
-#include <ydb/core/util/ulid.h>
#include <ydb/services/metadata/secret/fetcher.h>
#include <ydb/services/metadata/secret/snapshot.h>
@@ -117,7 +117,8 @@ public:
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId, ui64 spanVerbosity = 0, TString spanName = "no_name")
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
@@ -126,6 +127,7 @@ public:
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
+ , UserRequestContext(userRequestContext)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -135,7 +137,6 @@ public:
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
- UserRequestContext = MakeIntrusive<TUserRequestContext>(TUserRequestContext(UlidGen.Next().ToString(), Database, sessionId)); // here is no SessionId
}
void Bootstrap() {
@@ -1299,7 +1300,6 @@ protected:
TDuration MaximalSecretsSnapshotWaitTime;
bool SubscribedOnSecrets = false;
- TULIDGenerator UlidGen;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
private:
@@ -1312,14 +1312,14 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId);
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId);
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index a8a0ba7336..4286719a7e 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -62,8 +62,8 @@ public:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId, TWilsonKqp::ScanExecuter, "ScanExecuter")
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter")
, PreparedQuery(preparedQuery)
, AggregationSettings(aggregation)
{
@@ -703,10 +703,10 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- TDuration maximalSecretsSnapshotWaitTime, const TString& sessionId)
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig,
- preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, sessionId);
+ preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index 8bc47fa74f..073f2ea4aa 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -161,7 +161,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(s
}
return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid,
- std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
+ std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
}
std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) {
@@ -197,7 +197,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
}
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid,
- CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
+ CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
}
void TKqpQueryState::AddOffsetsToTransaction() {
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 36c61a0758..771dd3d1aa 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -12,6 +12,7 @@
#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/session_actor/kqp_tx.h>
#include <util/generic/noncopyable.h>
@@ -32,7 +33,7 @@ public:
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
- NWilson::TTraceId&& traceId)
+ NWilson::TTraceId&& traceId, const TString& sessionId)
: QueryId(queryId)
, Database(database)
, Cluster(cluster)
@@ -41,7 +42,6 @@ public:
, ProxyRequestId(ev->Cookie)
, ParametersSize(ev->Get()->GetParametersSize())
, RequestActorId(ev->Get()->GetRequestActorId())
- , TraceId(ev->Get()->GetTraceId())
, IsDocumentApiRestricted_(IsDocumentApiRestricted(ev->Get()->GetRequestType()))
, StartTime(TInstant::Now())
, KeepSession(ev->Get()->GetKeepSession() || longSession)
@@ -61,6 +61,8 @@ public:
KqpSessionSpan = NWilson::TSpan(
TWilsonKqp::KqpSession, std::move(traceId),
"Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+
+ UserRequestContext = MakeIntrusive<TUserRequestContext>(ev->Get()->GetTraceId(), Database, sessionId);
}
// the monotonously growing counter, the ordinal number of the query,
@@ -84,7 +86,7 @@ public:
TActorId RequestActorId;
ui64 CurrentTx = 0;
- TString TraceId;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
bool IsDocumentApiRestricted_ = false;
TInstant StartTime;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c4c29b41d4..089dbdf9a6 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -198,7 +198,7 @@ public:
<< "ActorId: " << SelfId() << ", "
<< "ActorState: " << CurrentStateFuncName() << ", ";
if (Y_LIKELY(QueryState)) {
- result << "TraceId: " << QueryState->TraceId << ", ";
+ result << "TraceId: " << QueryState->UserRequestContext->TraceId << ", ";
}
return result;
}
@@ -216,7 +216,7 @@ public:
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
- Settings.TableService, Settings.QueryService, std::move(id));
+ Settings.TableService, Settings.QueryService, std::move(id), SessionId);
}
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -1070,7 +1070,7 @@ public:
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()),
- SessionId);
+ QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId));
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -1318,7 +1318,7 @@ public:
stats->MutableCompilation()->Swap(&QueryState->CompileStats);
}
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId);
YQL_ENSURE(QueryState);
if (IsExecuteAction(QueryState->GetAction())) {
auto ru = NRuCalc::CalcRequestUnit(*stats);