aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-04-25 18:15:31 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-04-25 18:15:31 +0300
commit9bcfd8239b05d3e323f4a58681a4cdde0675603b (patch)
tree6be90488b691bdd4830c8180eddf9a97c141784d
parent65f6a96c441ead6ae099f36f15473eaa3de654b5 (diff)
downloadydb-9bcfd8239b05d3e323f4a58681a4cdde0675603b.tar.gz
Support InternalCall flag for internal local rpc requests
-rw-r--r--ydb/core/grpc_services/base/base.h6
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h16
-rw-r--r--ydb/core/kqp/common/kqp.h10
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp3
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp20
-rw-r--r--ydb/core/kqp/host/kqp_host.h5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp25
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h3
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp6
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp3
-rw-r--r--ydb/core/protos/kqp.proto1
16 files changed, 83 insertions, 31 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index af1585462d..0f7ead8b57 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -256,6 +256,10 @@ public:
// Returns internal token as a serialized message.
virtual const TString& GetSerializedToken() const = 0;
virtual bool IsClientLost() const = 0;
+ // Is this call made from inside YDB?
+ virtual bool IsInternalCall() const {
+ return false;
+ }
};
class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe {
@@ -385,7 +389,7 @@ public:
virtual void Pass(const IFacilityProvider& facility) = 0;
};
-// Provide methods which can be safly passed though actor system
+// Provide methods which can be safely passed though actor system
// as part of event
class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe {
public:
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index 93ac8897bb..9f1be767a8 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -30,10 +30,11 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
public:
using TResp = typename TRpc::TResponse;
template<typename TProto, typename TCb>
- TLocalRpcCtx(TProto&& req, TCb&& cb, const TString& databaseName, const TMaybe<TString>& token)
+ TLocalRpcCtx(TProto&& req, TCb&& cb, const TString& databaseName, const TMaybe<TString>& token, bool internalCall)
: Request(std::forward<TProto>(req))
, CbWrapper(std::forward<TCb>(cb))
, DatabaseName(databaseName)
+ , InternalCall(internalCall)
{
if (token) {
InternalToken = new NACLib::TUserToken(*token);
@@ -213,6 +214,10 @@ public:
return Nothing();
}
+ bool IsInternalCall() const override {
+ return InternalCall;
+ }
+
private:
void Reply(NProtoBuf::Message *r, ui32) override {
TResp* resp = dynamic_cast<TResp*>(r);
@@ -224,6 +229,7 @@ private:
typename TRpc::TRequest Request;
TCbWrapper CbWrapper;
const TString DatabaseName;
+ const bool InternalCall;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken;
const TString EmptySerializedTokenMessage_;
@@ -234,13 +240,13 @@ private:
};
template<typename TRpc>
-NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem) {
+NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false) {
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
proto.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC);
using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>;
- auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token);
+ auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, internalCall);
auto actor = TRpc::CreateRpcActor(req);
actorSystem->Register(actor, TMailboxType::HTSwap, actorSystem->AppData<TAppData>()->UserPoolId);
@@ -248,10 +254,10 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest
}
template<typename TRpc>
-TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx) {
+TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx, bool internalCall = false) {
proto.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC);
- auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token);
+ auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token, internalCall);
auto actor = TRpc::CreateRpcActor(req);
return ctx.RegisterWithSameMailbox(actor);
}
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index d1bbcc8bf1..1860276793 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -127,10 +127,12 @@ private:
struct TKqpQuerySettings {
bool DocumentApiRestricted = true;
+ bool IsInternalCall = false;
bool operator==(const TKqpQuerySettings& other) const {
return
- DocumentApiRestricted == other.DocumentApiRestricted;
+ DocumentApiRestricted == other.DocumentApiRestricted &&
+ IsInternalCall == other.IsInternalCall;
}
bool operator!=(const TKqpQuerySettings& other) {
@@ -143,7 +145,7 @@ struct TKqpQuerySettings {
bool operator>=(const TKqpQuerySettings&) = delete;
size_t GetHash() const noexcept {
- auto tuple = std::make_tuple(DocumentApiRestricted);
+ auto tuple = std::make_tuple(DocumentApiRestricted, IsInternalCall);
return THash<decltype(tuple)>()(tuple);
}
};
@@ -448,6 +450,10 @@ struct TEvKqp {
return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size();
}
+ bool IsInternalCall() const {
+ return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall();
+ }
+
ui64 GetParametersSize() const {
if (ParametersSize > 0) {
return ParametersSize;
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index 66842a2557..1088f60229 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -88,6 +88,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds());
Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds());
}
+ Record.MutableRequest()->SetIsInternalCall(RequestCtx->IsInternalCall());
RequestCtx.reset();
}
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 7de3108860..23096d9ed5 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -103,10 +103,11 @@ public:
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
KqpHost = CreateKqpHost(Gateway, Query.Cluster, Query.Database, Config, ModuleResolverState->ModuleResolver,
- HttpGateway, AppData(ctx)->FunctionRegistry, false);
+ HttpGateway, AppData(ctx)->FunctionRegistry, false, Query.Settings.IsInternalCall);
IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = Query.Settings.DocumentApiRestricted;
+ prepareSettings.IsInternalCall = Query.Settings.IsInternalCall;
NCpuTime::TCpuTimer timer(CompileCpuTime);
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 0949fc95a1..160c313d7b 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -294,7 +294,7 @@ public:
private:
size_t MaxSize = 0;
TRequestsList Queue;
- THashMap<TKqpQueryId, TRequestsIteratorSet, THash<TKqpQueryId>> QueryIndex;
+ THashMap<TKqpQueryId, TRequestsIteratorSet> QueryIndex;
THashMap<TKqpQueryId, TKqpCompileRequest> ActiveRequests;
};
@@ -382,7 +382,7 @@ private:
Config.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin ||
Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead ||
Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||
- Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract ||
+ Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract ||
Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) {
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index a2de3e62de..802ce2a7b8 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -881,12 +881,14 @@ public:
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database,
TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
NYql::IHTTPGateway::TPtr httpGateway,
- const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges)
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
+ bool isInternalCall)
: Gateway(gateway)
, Cluster(cluster)
, ExprCtx(new TExprContext())
, ModuleResolver(moduleResolver)
, KeepConfigChanges(keepConfigChanges)
+ , IsInternalCall(isInternalCall)
, HttpGateway(std::move(httpGateway))
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider))
, ClustersMap({{Cluster, TString(KikimrProviderName)}})
@@ -1246,6 +1248,9 @@ private:
if (settings.DocumentApiRestricted) {
SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted;
}
+ if (settings.IsInternalCall) {
+ SessionCtx->Query().IsInternalCall = *settings.IsInternalCall;
+ }
TMaybe<TSqlVersion> sqlVersion;
auto queryExpr = CompileYqlQuery(query, /* isSql */ true, /* sqlAutoCommit */ false, ctx, sqlVersion);
@@ -1268,6 +1273,9 @@ private:
if (settings.DocumentApiRestricted) {
SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted;
}
+ if (settings.IsInternalCall) {
+ SessionCtx->Query().IsInternalCall = *settings.IsInternalCall;
+ }
TMaybe<TSqlVersion> sqlVersion;
auto queryExpr = CompileYqlQuery(queryAst, false, false, ctx, sqlVersion);
@@ -1291,6 +1299,9 @@ private:
if (settings.DocumentApiRestricted) {
SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted;
}
+ if (settings.IsInternalCall) {
+ SessionCtx->Query().IsInternalCall = *settings.IsInternalCall;
+ }
// TODO: Support PG
TMaybe<TSqlVersion> sqlVersion = 1;
@@ -1478,7 +1489,7 @@ private:
// Kikimr provider
auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner);
- auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory);
+ auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory, IsInternalCall);
auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, queryExecutor);
FillSettings.AllResultsBytesLimit = Nothing();
@@ -1583,6 +1594,7 @@ private:
THolder<TExprContext> ExprCtx;
IModuleResolver::TPtr ModuleResolver;
bool KeepConfigChanges;
+ bool IsInternalCall;
NYql::IHTTPGateway::TPtr HttpGateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
@@ -1621,10 +1633,10 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
- NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges)
+ NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, std::move(httpGateway), funcRegistry,
- keepConfigChanges);
+ keepConfigChanges, isInternalCall);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index d5eb68e19d..45f1cd5509 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -19,9 +19,10 @@ public:
struct TPrepareSettings {
TMaybe<bool> DocumentApiRestricted;
+ TMaybe<bool> IsInternalCall;
TString ToString() const {
- return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " }";
+ return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " IsInternalCall: " << IsInternalCall << " }";
}
};
@@ -74,7 +75,7 @@ public:
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
- NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false);
+ NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 87d46207c7..8fcfd98fff 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -99,11 +99,14 @@ public:
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
TTypeAnnotationContext& types,
- const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
+ bool isInternalCall)
: Gateway(gateway)
, SessionCtx(sessionCtx)
, Types(types)
- , ExternalSourceFactory(externalSourceFactory) {}
+ , ExternalSourceFactory(externalSourceFactory)
+ , IsInternalCall(isInternalCall)
+ {}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
@@ -133,7 +136,7 @@ public:
auto& result = emplaceResult.first->second;
auto future = Gateway->LoadTableMetadata(clusterName, tableName,
- IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats()));
+ IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats()).WithPrivateTables(IsInternalCall));
futures.push_back(future.Apply([result, queryType]
(const NThreading::TFuture<IKikimrGateway::TTableMetadataResult>& future) {
@@ -255,6 +258,7 @@ private:
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
TTypeAnnotationContext& Types;
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory;
+ const bool IsInternalCall;
THashMap<std::pair<TString, TString>, std::shared_ptr<IKikimrGateway::TTableMetadataResult>> LoadResults;
NThreading::TFuture<void> AsyncFuture;
@@ -329,7 +333,8 @@ public:
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
- const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
+ bool isInternalCall)
: FunctionRegistry(functionRegistry)
, Types(types)
, Gateway(gateway)
@@ -337,7 +342,7 @@ public:
, ExternalSourceFactory(externalSourceFactory)
, ConfigurationTransformer(new TKikimrConfigurationTransformer(sessionCtx, types))
, IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx))
- , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory))
+ , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall))
, TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types))
, CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx))
@@ -769,17 +774,19 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
- const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
+ bool isInternalCall)
{
- return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory);
+ return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory, isInternalCall);
}
TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
TTypeAnnotationContext& types,
- const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
+ bool isInternalCall)
{
- return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory);
+ return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall);
}
} // namespace NYql
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 1d104a4b83..1523938bae 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -98,6 +98,8 @@ struct TKikimrQueryContext : TThrRefBase {
// full mode can be enabled explicitly.
bool DocumentApiRestricted = true;
+ bool IsInternalCall = false;
+
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
NKikimr::NKqp::TQueryData::TPtr QueryData;
@@ -454,7 +456,8 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
- const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory);
+ const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory,
+ bool isInternalCall);
TIntrusivePtr<IDataProvider> CreateKikimrDataSink(
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index d1d4702db5..b89d03ada6 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -158,7 +158,8 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr
TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
TTypeAnnotationContext& types,
- const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory);
+ const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory,
+ bool isInternalCall);
TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx);
TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer(
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 485598a61c..c68353cdad 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -427,7 +427,7 @@ public:
using TEvCreateSessionRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
Ydb::Table::CreateSessionResponse>;
Ydb::Table::CreateSessionRequest req;
- Subscribe<Ydb::Table::CreateSessionResponse, TEvPrivate::TEvCreateSessionResult>(NRpcService::DoLocalRpc<TEvCreateSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem()));
+ Subscribe<Ydb::Table::CreateSessionResponse, TEvPrivate::TEvCreateSessionResult>(NRpcService::DoLocalRpc<TEvCreateSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true));
}
void RunDeleteSession() {
@@ -435,7 +435,7 @@ public:
Ydb::Table::DeleteSessionResponse>;
Ydb::Table::DeleteSessionRequest req;
req.set_session_id(SessionId);
- Subscribe<Ydb::Table::DeleteSessionResponse, TEvPrivate::TEvDeleteSessionResult>(NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem()));
+ Subscribe<Ydb::Table::DeleteSessionResponse, TEvPrivate::TEvDeleteSessionResult>(NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true));
}
void RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params) {
@@ -452,7 +452,7 @@ public:
auto p = params->Build();
*req.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(p);
}
- Subscribe<Ydb::Table::ExecuteDataQueryResponse, TEvPrivate::TEvDataQueryResult>(NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem()));
+ Subscribe<Ydb::Table::ExecuteDataQueryResponse, TEvPrivate::TEvDataQueryResult>(NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true));
}
virtual void RunQuery() = 0;
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index 4220ba7cee..c18909c74f 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -147,6 +147,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest()
if (query) {
query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_;
+ query->Settings.IsInternalCall = IsInternalCall();
}
auto compileDeadline = QueryDeadlines.TimeoutAt;
@@ -182,6 +183,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
if (query) {
query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_;
+ query->Settings.IsInternalCall = IsInternalCall();
}
auto compileDeadline = QueryDeadlines.TimeoutAt;
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index bc1951b69a..993bbe5b16 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -271,6 +271,10 @@ public:
);
}
+ bool IsInternalCall() const {
+ return RequestEv->IsInternalCall();
+ }
+
void ResetTimer() {
if (CurrentTimer) {
CpuTime += CurrentTimer->GetTime();
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index aeab7f2dc4..a26222a5fe 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -62,6 +62,7 @@ struct TKqpQueryState {
NYql::TKikimrQueryDeadlines QueryDeadlines;
ui32 ReplyFlags = 0;
bool KeepSession = false;
+ bool IsInternalCall = false;
TMaybe<NKikimrKqp::TRlPath> RlPath;
};
@@ -206,6 +207,7 @@ public:
QueryState->ReplyFlags = queryRequest.GetReplyFlags();
QueryState->UserToken = new NACLib::TUserToken(event.GetUserToken());
QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
+ QueryState->IsInternalCall = queryRequest.GetIsInternalCall();
if (GetStatsMode(queryRequest, EKikimrStatsMode::None) > EKikimrStatsMode::Basic) {
QueryState->ReplyFlags |= NKikimrKqp::QUERY_REPLY_FLAG_AST;
@@ -683,6 +685,7 @@ private:
case NKikimrKqp::QUERY_TYPE_SQL_DML: {
IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType);
+ prepareSettings.IsInternalCall = QueryState->IsInternalCall;
QueryState->AsyncQueryResult = KqpHost->PrepareDataQuery(query, prepareSettings);
break;
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 21b063eb34..11fa10f60c 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -98,6 +98,7 @@ message TQueryRequest {
optional TTopicOperations TopicOperations = 22;
optional bool UsePublicResponseDataFormat = 23;
map<string, Ydb.TypedValue> YdbParameters = 24;
+ optional bool IsInternalCall = 25;
}
message TKqpPathIdProto {