diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-04-25 18:15:31 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-04-25 18:15:31 +0300 |
commit | 9bcfd8239b05d3e323f4a58681a4cdde0675603b (patch) | |
tree | 6be90488b691bdd4830c8180eddf9a97c141784d | |
parent | 65f6a96c441ead6ae099f36f15473eaa3de654b5 (diff) | |
download | ydb-9bcfd8239b05d3e323f4a58681a4cdde0675603b.tar.gz |
Support InternalCall flag for internal local rpc requests
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 16 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_event_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 |
16 files changed, 83 insertions, 31 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index af1585462d..0f7ead8b57 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -256,6 +256,10 @@ public: // Returns internal token as a serialized message. virtual const TString& GetSerializedToken() const = 0; virtual bool IsClientLost() const = 0; + // Is this call made from inside YDB? + virtual bool IsInternalCall() const { + return false; + } }; class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe { @@ -385,7 +389,7 @@ public: virtual void Pass(const IFacilityProvider& facility) = 0; }; -// Provide methods which can be safly passed though actor system +// Provide methods which can be safely passed though actor system // as part of event class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe { public: diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index 93ac8897bb..9f1be767a8 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -30,10 +30,11 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { public: using TResp = typename TRpc::TResponse; template<typename TProto, typename TCb> - TLocalRpcCtx(TProto&& req, TCb&& cb, const TString& databaseName, const TMaybe<TString>& token) + TLocalRpcCtx(TProto&& req, TCb&& cb, const TString& databaseName, const TMaybe<TString>& token, bool internalCall) : Request(std::forward<TProto>(req)) , CbWrapper(std::forward<TCb>(cb)) , DatabaseName(databaseName) + , InternalCall(internalCall) { if (token) { InternalToken = new NACLib::TUserToken(*token); @@ -213,6 +214,10 @@ public: return Nothing(); } + bool IsInternalCall() const override { + return InternalCall; + } + private: void Reply(NProtoBuf::Message *r, ui32) override { TResp* resp = dynamic_cast<TResp*>(r); @@ -224,6 +229,7 @@ private: typename TRpc::TRequest Request; TCbWrapper CbWrapper; const TString DatabaseName; + const bool InternalCall; TIntrusiveConstPtr<NACLib::TUserToken> InternalToken; const TString EmptySerializedTokenMessage_; @@ -234,13 +240,13 @@ private: }; template<typename TRpc> -NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem) { +NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false) { auto promise = NThreading::NewPromise<typename TRpc::TResponse>(); proto.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC); using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>; - auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token); + auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, internalCall); auto actor = TRpc::CreateRpcActor(req); actorSystem->Register(actor, TMailboxType::HTSwap, actorSystem->AppData<TAppData>()->UserPoolId); @@ -248,10 +254,10 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest } template<typename TRpc> -TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx) { +TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx, bool internalCall = false) { proto.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC); - auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token); + auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token, internalCall); auto actor = TRpc::CreateRpcActor(req); return ctx.RegisterWithSameMailbox(actor); } diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index d1bbcc8bf1..1860276793 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -127,10 +127,12 @@ private: struct TKqpQuerySettings { bool DocumentApiRestricted = true; + bool IsInternalCall = false; bool operator==(const TKqpQuerySettings& other) const { return - DocumentApiRestricted == other.DocumentApiRestricted; + DocumentApiRestricted == other.DocumentApiRestricted && + IsInternalCall == other.IsInternalCall; } bool operator!=(const TKqpQuerySettings& other) { @@ -143,7 +145,7 @@ struct TKqpQuerySettings { bool operator>=(const TKqpQuerySettings&) = delete; size_t GetHash() const noexcept { - auto tuple = std::make_tuple(DocumentApiRestricted); + auto tuple = std::make_tuple(DocumentApiRestricted, IsInternalCall); return THash<decltype(tuple)>()(tuple); } }; @@ -448,6 +450,10 @@ struct TEvKqp { return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size(); } + bool IsInternalCall() const { + return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall(); + } + ui64 GetParametersSize() const { if (ParametersSize > 0) { return ParametersSize; diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index 66842a2557..1088f60229 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -88,6 +88,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds()); Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds()); } + Record.MutableRequest()->SetIsInternalCall(RequestCtx->IsInternalCall()); RequestCtx.reset(); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7de3108860..23096d9ed5 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -103,10 +103,11 @@ public: Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Query.Cluster, Query.Database, Config, ModuleResolverState->ModuleResolver, - HttpGateway, AppData(ctx)->FunctionRegistry, false); + HttpGateway, AppData(ctx)->FunctionRegistry, false, Query.Settings.IsInternalCall); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = Query.Settings.DocumentApiRestricted; + prepareSettings.IsInternalCall = Query.Settings.IsInternalCall; NCpuTime::TCpuTimer timer(CompileCpuTime); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 0949fc95a1..160c313d7b 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -294,7 +294,7 @@ public: private: size_t MaxSize = 0; TRequestsList Queue; - THashMap<TKqpQueryId, TRequestsIteratorSet, THash<TKqpQueryId>> QueryIndex; + THashMap<TKqpQueryId, TRequestsIteratorSet> QueryIndex; THashMap<TKqpQueryId, TKqpCompileRequest> ActiveRequests; }; @@ -382,7 +382,7 @@ private: Config.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin || Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead || Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead || - Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract || + Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract || Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) { LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index a2de3e62de..802ce2a7b8 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -881,12 +881,14 @@ public: TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, NYql::IHTTPGateway::TPtr httpGateway, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges) + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, + bool isInternalCall) : Gateway(gateway) , Cluster(cluster) , ExprCtx(new TExprContext()) , ModuleResolver(moduleResolver) , KeepConfigChanges(keepConfigChanges) + , IsInternalCall(isInternalCall) , HttpGateway(std::move(httpGateway)) , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider)) , ClustersMap({{Cluster, TString(KikimrProviderName)}}) @@ -1246,6 +1248,9 @@ private: if (settings.DocumentApiRestricted) { SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted; } + if (settings.IsInternalCall) { + SessionCtx->Query().IsInternalCall = *settings.IsInternalCall; + } TMaybe<TSqlVersion> sqlVersion; auto queryExpr = CompileYqlQuery(query, /* isSql */ true, /* sqlAutoCommit */ false, ctx, sqlVersion); @@ -1268,6 +1273,9 @@ private: if (settings.DocumentApiRestricted) { SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted; } + if (settings.IsInternalCall) { + SessionCtx->Query().IsInternalCall = *settings.IsInternalCall; + } TMaybe<TSqlVersion> sqlVersion; auto queryExpr = CompileYqlQuery(queryAst, false, false, ctx, sqlVersion); @@ -1291,6 +1299,9 @@ private: if (settings.DocumentApiRestricted) { SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted; } + if (settings.IsInternalCall) { + SessionCtx->Query().IsInternalCall = *settings.IsInternalCall; + } // TODO: Support PG TMaybe<TSqlVersion> sqlVersion = 1; @@ -1478,7 +1489,7 @@ private: // Kikimr provider auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner); - auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory); + auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory, IsInternalCall); auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, queryExecutor); FillSettings.AllResultsBytesLimit = Nothing(); @@ -1583,6 +1594,7 @@ private: THolder<TExprContext> ExprCtx; IModuleResolver::TPtr ModuleResolver; bool KeepConfigChanges; + bool IsInternalCall; NYql::IHTTPGateway::TPtr HttpGateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; @@ -1621,10 +1633,10 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, - NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges) + NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall) { return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, std::move(httpGateway), funcRegistry, - keepConfigChanges); + keepConfigChanges, isInternalCall); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index d5eb68e19d..45f1cd5509 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -19,9 +19,10 @@ public: struct TPrepareSettings { TMaybe<bool> DocumentApiRestricted; + TMaybe<bool> IsInternalCall; TString ToString() const { - return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " }"; + return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " IsInternalCall: " << IsInternalCall << " }"; } }; @@ -74,7 +75,7 @@ public: TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver, - NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false); + NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 87d46207c7..8fcfd98fff 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -99,11 +99,14 @@ public: TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types, - const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, + bool isInternalCall) : Gateway(gateway) , SessionCtx(sessionCtx) , Types(types) - , ExternalSourceFactory(externalSourceFactory) {} + , ExternalSourceFactory(externalSourceFactory) + , IsInternalCall(isInternalCall) + {} TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; @@ -133,7 +136,7 @@ public: auto& result = emplaceResult.first->second; auto future = Gateway->LoadTableMetadata(clusterName, tableName, - IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats())); + IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats()).WithPrivateTables(IsInternalCall)); futures.push_back(future.Apply([result, queryType] (const NThreading::TFuture<IKikimrGateway::TTableMetadataResult>& future) { @@ -255,6 +258,7 @@ private: TIntrusivePtr<TKikimrSessionContext> SessionCtx; TTypeAnnotationContext& Types; NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; + const bool IsInternalCall; THashMap<std::pair<TString, TString>, std::shared_ptr<IKikimrGateway::TTableMetadataResult>> LoadResults; NThreading::TFuture<void> AsyncFuture; @@ -329,7 +333,8 @@ public: TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, - const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, + bool isInternalCall) : FunctionRegistry(functionRegistry) , Types(types) , Gateway(gateway) @@ -337,7 +342,7 @@ public: , ExternalSourceFactory(externalSourceFactory) , ConfigurationTransformer(new TKikimrConfigurationTransformer(sessionCtx, types)) , IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx)) - , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory)) + , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall)) , TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types)) , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx)) @@ -769,17 +774,19 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource( TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, - const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, + bool isInternalCall) { - return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory); + return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory, isInternalCall); } TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types, - const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, + bool isInternalCall) { - return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory); + return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall); } } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 1d104a4b83..1523938bae 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -98,6 +98,8 @@ struct TKikimrQueryContext : TThrRefBase { // full mode can be enabled explicitly. bool DocumentApiRestricted = true; + bool IsInternalCall = false; + std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery; std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; NKikimr::NKqp::TQueryData::TPtr QueryData; @@ -454,7 +456,8 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource( TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, - const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory); + const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory, + bool isInternalCall); TIntrusivePtr<IDataProvider> CreateKikimrDataSink( const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index d1d4702db5..b89d03ada6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -158,7 +158,8 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types, - const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory); + const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory, + bool isInternalCall); TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx); TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer( diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 485598a61c..c68353cdad 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -427,7 +427,7 @@ public: using TEvCreateSessionRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse>; Ydb::Table::CreateSessionRequest req; - Subscribe<Ydb::Table::CreateSessionResponse, TEvPrivate::TEvCreateSessionResult>(NRpcService::DoLocalRpc<TEvCreateSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem())); + Subscribe<Ydb::Table::CreateSessionResponse, TEvPrivate::TEvCreateSessionResult>(NRpcService::DoLocalRpc<TEvCreateSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); } void RunDeleteSession() { @@ -435,7 +435,7 @@ public: Ydb::Table::DeleteSessionResponse>; Ydb::Table::DeleteSessionRequest req; req.set_session_id(SessionId); - Subscribe<Ydb::Table::DeleteSessionResponse, TEvPrivate::TEvDeleteSessionResult>(NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem())); + Subscribe<Ydb::Table::DeleteSessionResponse, TEvPrivate::TEvDeleteSessionResult>(NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); } void RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params) { @@ -452,7 +452,7 @@ public: auto p = params->Build(); *req.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(p); } - Subscribe<Ydb::Table::ExecuteDataQueryResponse, TEvPrivate::TEvDataQueryResult>(NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem())); + Subscribe<Ydb::Table::ExecuteDataQueryResponse, TEvPrivate::TEvDataQueryResult>(NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); } virtual void RunQuery() = 0; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 4220ba7cee..c18909c74f 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -147,6 +147,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() if (query) { query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_; + query->Settings.IsInternalCall = IsInternalCall(); } auto compileDeadline = QueryDeadlines.TimeoutAt; @@ -182,6 +183,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque if (query) { query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_; + query->Settings.IsInternalCall = IsInternalCall(); } auto compileDeadline = QueryDeadlines.TimeoutAt; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index bc1951b69a..993bbe5b16 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -271,6 +271,10 @@ public: ); } + bool IsInternalCall() const { + return RequestEv->IsInternalCall(); + } + void ResetTimer() { if (CurrentTimer) { CpuTime += CurrentTimer->GetTime(); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index aeab7f2dc4..a26222a5fe 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -62,6 +62,7 @@ struct TKqpQueryState { NYql::TKikimrQueryDeadlines QueryDeadlines; ui32 ReplyFlags = 0; bool KeepSession = false; + bool IsInternalCall = false; TMaybe<NKikimrKqp::TRlPath> RlPath; }; @@ -206,6 +207,7 @@ public: QueryState->ReplyFlags = queryRequest.GetReplyFlags(); QueryState->UserToken = new NACLib::TUserToken(event.GetUserToken()); QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); + QueryState->IsInternalCall = queryRequest.GetIsInternalCall(); if (GetStatsMode(queryRequest, EKikimrStatsMode::None) > EKikimrStatsMode::Basic) { QueryState->ReplyFlags |= NKikimrKqp::QUERY_REPLY_FLAG_AST; @@ -683,6 +685,7 @@ private: case NKikimrKqp::QUERY_TYPE_SQL_DML: { IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType); + prepareSettings.IsInternalCall = QueryState->IsInternalCall; QueryState->AsyncQueryResult = KqpHost->PrepareDataQuery(query, prepareSettings); break; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 21b063eb34..11fa10f60c 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -98,6 +98,7 @@ message TQueryRequest { optional TTopicOperations TopicOperations = 22; optional bool UsePublicResponseDataFormat = 23; map<string, Ydb.TypedValue> YdbParameters = 24; + optional bool IsInternalCall = 25; } message TKqpPathIdProto { |