diff options
author | grigoriypisar <grigoriypisar@ydb.tech> | 2023-12-07 13:57:11 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@ydb.tech> | 2023-12-07 14:32:33 +0300 |
commit | bb46cc0930549897180c81654d7dca4e40a93908 (patch) | |
tree | fa84c55792f56d328df275c4166d9069f402f851 | |
parent | d014e0f31449a0b13f21906969251d9624c3a62f (diff) | |
download | ydb-bb46cc0930549897180c81654d7dca4e40a93908.tar.gz |
generic query auto detection
Added generic query detection
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 168 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_common.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 66 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 2 |
13 files changed, 299 insertions, 21 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 513fa9501f..fb5982549a 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -63,6 +63,7 @@ public: , UserToken(userToken) , DbCounters(dbCounters) , Config(MakeIntrusive<TKikimrConfiguration>()) + , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , CompilationTimeout(TDuration::MilliSeconds(tableServiceConfig.GetCompileTimeoutMs())) , UserRequestContext(userRequestContext) @@ -80,8 +81,8 @@ public: ApplyServiceConfig(*Config, tableServiceConfig); - if (QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT) { - ui32 scriptResultRowsLimit = queryServiceConfig.GetScriptResultRowsLimit(); + if (QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT || QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) { + ui32 scriptResultRowsLimit = QueryServiceConfig.GetScriptResultRowsLimit(); if (scriptResultRowsLimit > 0) { Config->_ResultRowsLimit = scriptResultRowsLimit; } else { @@ -177,7 +178,7 @@ private: std::make_shared<TKqpTableMetadataLoader>( TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader), - ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters); + ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig); Gateway->SetToken(QueryId.Cluster, UserToken); Config->FeatureFlags = AppData(ctx)->FeatureFlags; @@ -455,6 +456,7 @@ private: TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TKqpDbCountersPtr DbCounters; TKikimrConfiguration::TPtr Config; + TQueryServiceConfig QueryServiceConfig; TMetadataProviderConfig MetadataProviderConfig; TDuration CompilationTimeout; TInstant StartTime; diff --git a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp index f73266006d..7748e7411a 100644 --- a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp +++ b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp @@ -42,7 +42,8 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway> TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) { auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem(); return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(), - actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0), TAlignedPagePoolCounters()); + actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0), + TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig.GetQueryServiceConfig()); } [[maybe_unused]] diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 68f69929b0..3e4477bb7e 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -195,11 +195,17 @@ public: virtual NThreading::TFuture<TQueryResult> StreamExecScanQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr, const TAstQuerySettings& settings, const NActors::TActorId& target, std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0; + + virtual NThreading::TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, + TQueryData::TPtr params, const TAstQuerySettings& settings, + const Ydb::Table::TransactionSettings& txSettings) = 0; + + virtual NThreading::TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) = 0; }; TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, - ui32 nodeId, TKqpRequestCounters::TPtr counters); + ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig()); bool SplitTablePath(const TString& tableName, const TString& database, std::pair<TString, TString>& pathPair, TString& error, bool createDir); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index a6003e43de..bcf3a2023d 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -427,6 +427,107 @@ private: TVector<NYql::NDqProto::TDqExecutionStats> Executions; }; +class TKqpGenericQueryRequestHandler: public TRequestHandlerBase< + TKqpGenericQueryRequestHandler, + TEvKqp::TEvQueryRequest, + TEvKqp::TEvQueryResponse, + IKqpGateway::TQueryResult> +{ + struct TResultSetDescription { + Ydb::ResultSet ResultSet; + ui64 RowCount = 0; + ui64 ByteCount = 0; + bool Initialized = false; + }; + +public: + using TRequest = TEvKqp::TEvQueryRequest; + using TResponse = TEvKqp::TEvQueryResponse; + using TResult = IKqpGateway::TQueryResult; + + using TBase = TKqpGenericQueryRequestHandler::TBase; + using TCallbackFunc = TBase::TCallbackFunc; + + TKqpGenericQueryRequestHandler(TRequest* request, ui64 rowsLimit, ui64 sizeLimit, TPromise<TResult> promise, TCallbackFunc callback) + : TBase(request, promise, callback) + , RowsLimit(rowsLimit) + , SizeLimit(sizeLimit) + {} + + void Bootstrap() { + ActorIdToProto(SelfId(), Request->Record.MutableRequestActorId()); + Send(MakeKqpProxyID(SelfId().NodeId()), Request.Release()); + Become(&TKqpGenericQueryRequestHandler::AwaitState); + } + + void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) { + auto& record = ev->Get()->Record; + + const ui32 resultSetId = record.GetQueryResultIndex(); + if (resultSetId >= ResultSets.size()) { + ResultSets.resize(resultSetId + 1); + } + + if (!ResultSets[resultSetId].Initialized) { + ResultSets[resultSetId].Initialized = true; + for (auto& column : *record.MutableResultSet()->mutable_columns()) { + ResultSets[resultSetId].ResultSet.add_columns()->CopyFrom(std::move(column)); + } + } + + if (!ResultSets[resultSetId].ResultSet.truncated()) { + for (auto& row : *record.MutableResultSet()->mutable_rows()) { + if (RowsLimit && ResultSets[resultSetId].RowCount + 1 > RowsLimit) { + ResultSets[resultSetId].ResultSet.set_truncated(true); + break; + } + + auto serializedSize = row.ByteSizeLong(); + if (SizeLimit && ResultSets[resultSetId].ByteCount + serializedSize > SizeLimit) { + ResultSets[resultSetId].ResultSet.set_truncated(true); + break; + } + + ResultSets[resultSetId].RowCount++; + ResultSets[resultSetId].ByteCount += serializedSize; + *ResultSets[resultSetId].ResultSet.add_rows() = std::move(row); + } + } + + auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max()); + Send(ev->Sender, response.Release()); + } + + using TBase::HandleResponse; + + void HandleResponse(TResponse::TPtr &ev, const TActorContext &ctx) { + auto& response = *ev->Get()->Record.GetRef().MutableResponse(); + + for (auto& resultSet : ResultSets) { + ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults()); + } + + TBase::HandleResponse(ev, ctx); + } + + STFUNC(AwaitState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqpExecuter::TEvStreamData, Handle); + HFunc(TEvKqp::TEvQueryResponse, HandleResponse); + + default: + TBase::HandleUnexpectedEvent("TKqpGenericQueryRequestHandler", ev->GetTypeRewrite()); + } + } + +private: + std::vector<TResultSetDescription> ResultSets; + const ui64 RowsLimit; + const ui64 SizeLimit; +}; + class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExecuterRequestHandler> { public: using TResult = IKqpGateway::TGenericResult; @@ -597,14 +698,15 @@ private: public: TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader, - TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters) + TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) : Cluster(cluster) , QueryType(queryType) , Database(database) , ActorSystem(actorSystem) , NodeId(nodeId) , Counters(counters) - , MetadataLoader(std::move(metadataLoader)) {} + , MetadataLoader(std::move(metadataLoader)) + , QueryServiceConfig(queryServiceConfig) {} bool HasCluster(const TString& cluster) override { return cluster == Cluster; @@ -2000,6 +2102,31 @@ public: }); } + TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params, + const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override + { + YQL_ENSURE(cluster == Cluster); + + auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); + + auto& request = *ev->Record.MutableRequest(); + request.SetCollectStats(settings.CollectStats); + + FillParameters(std::move(params), request.MutableYdbParameters()); + + auto& txControl = *request.MutableTxControl(); + txControl.mutable_begin_tx()->CopyFrom(txSettings); + txControl.set_commit_tx(true); + + return RunGenericQuery(query, NKikimrKqp::QUERY_ACTION_EXECUTE, std::move(ev)); + } + + TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) override { + YQL_ENSURE(cluster == Cluster); + + return RunGenericQuery(query, NKikimrKqp::QUERY_ACTION_EXPLAIN, MakeHolder<TEvKqp::TEvQueryRequest>()); + } + private: using TDescribeSchemeResponse = TEvSchemeShard::TEvDescribeSchemeResult; using TTransactionResponse = TEvTxUserProxy::TEvProposeTransactionStatus; @@ -2065,6 +2192,17 @@ private: return promise.GetFuture(); } + TFuture<TQueryResult> SendKqpGenericQueryRequest(TEvKqp::TEvQueryRequest* request, ui64 rowsLimit, ui64 sizeLimit, + TKqpGenericQueryRequestHandler::TCallbackFunc callback) + { + auto promise = NewPromise<TQueryResult>(); + IActor* requestHandler = new TKqpGenericQueryRequestHandler(request, rowsLimit, sizeLimit, + promise, callback); + RegisterActor(requestHandler); + + return promise.GetFuture(); + } + template<typename TRequest, typename TResponse, typename TResult> TFuture<TResult> SendActorRequest(const TActorId& actorId, TRequest* request, typename TActorRequestHandler<TRequest, TResponse, TResult>::TCallbackFunc callback) @@ -2101,6 +2239,27 @@ private: }); } + TFuture<TQueryResult> RunGenericQuery(const TString& query, NKikimrKqp::EQueryAction action, THolder<TEvKqp::TEvQueryRequest> ev) { + if (UserToken) { + ev->Record.SetUserToken(UserToken->GetSerializedToken()); + } + + auto& request = *ev->Record.MutableRequest(); + request.SetDatabase(Database); + request.SetAction(action); + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + request.SetQuery(query); + request.SetKeepSession(false); + + return SendKqpGenericQueryRequest(ev.Release(), QueryServiceConfig.GetScriptResultRowsLimit(), QueryServiceConfig.GetScriptResultSizeLimit(), + [] (TPromise<TQueryResult> promise, TEvKqp::TEvQueryResponse&& responseEv) { + TQueryResult queryResult; + queryResult.ProtobufArenaPtr.reset(new google::protobuf::Arena()); + KqpResponseToQueryResult(responseEv.Record.GetRef(), queryResult); + promise.SetValue(std::move(queryResult)); + }); + } + bool CheckCluster(const TString& cluster) { return cluster == Cluster; } @@ -2252,16 +2411,17 @@ private: TAlignedPagePoolCounters AllocCounters; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; std::shared_ptr<IKqpTableMetadataLoader> MetadataLoader; + NKikimrConfig::TQueryServiceConfig QueryServiceConfig; }; } // namespace TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, - ui32 nodeId, TKqpRequestCounters::TPtr counters) + ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) { return MakeIntrusive<TKikimrIcGateway>(cluster, queryType, database, std::move(metadataLoader), actorSystem, nodeId, - counters); + counters, queryServiceConfig); } bool SplitTablePath(const TString& tableName, const TString& database, std::pair<TString, TString>& pathPair, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 451f4a5bce..e25229efdd 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -16,7 +16,9 @@ #include <ydb/library/yql/providers/common/codec/yql_codec.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h> +#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h> #include <ydb/library/yql/providers/generic/provider/yql_generic_state.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> @@ -715,6 +717,7 @@ public: auto queryAstStr = SerializeExpr(ctx, *query); + bool useGenericQuery = ShouldUseGenericQuery(dataQueryBlocks); bool useScanQuery = ShouldUseScanQuery(dataQueryBlocks, settings); IKqpGateway::TAstQuerySettings querySettings; @@ -723,7 +726,16 @@ public: TFuture<TQueryResult> future; switch (queryType) { case EKikimrQueryType::YqlScript: - if (useScanQuery) { + if (useGenericQuery) { + Ydb::Table::TransactionSettings txSettings; + txSettings.mutable_serializable_read_write(); + if (SessionCtx->Query().PrepareOnly) { + future = Gateway->ExplainGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText()); + } else { + future = Gateway->ExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query), + querySettings, txSettings); + } + } else if (useScanQuery) { ui64 rowsLimit = 0; if (dataQueryBlocks.ArgCount() && !dataQueryBlocks.Arg(0).Results().Empty()) { const auto& queryBlock = dataQueryBlocks.Arg(0); @@ -799,6 +811,24 @@ private: return result; } + bool ShouldUseGenericQuery(const TKiDataQueryBlocks& queryBlocks) { + const auto& queryBlock = queryBlocks.Arg(0); + + bool hasFederatedSorcesOrSinks = false; + VisitExpr(queryBlock.Ptr(), [&hasFederatedSorcesOrSinks](const TExprNode::TPtr& exprNode) { + auto node = TExprBase(exprNode); + + hasFederatedSorcesOrSinks = hasFederatedSorcesOrSinks + || node.Maybe<TS3DataSource>() + || node.Maybe<TS3DataSink>() + || node.Maybe<TGenDataSource>(); + + return !hasFederatedSorcesOrSinks; + }); + + return hasFederatedSorcesOrSinks; + } + bool ShouldUseScanQuery(const TKiDataQueryBlocks& queryBlocks, const TExecuteSettings& settings) { if (settings.UseScanQuery) { return *settings.UseScanQuery; @@ -1351,6 +1381,7 @@ private: SessionCtx->Query().Deadlines = settings.Deadlines; SessionCtx->Query().StatsMode = settings.StatsMode; SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + SessionCtx->Query().PreparingQuery->SetText(script.Text); SessionCtx->Query().PreparedQuery.reset(); TMaybe<TSqlVersion> sqlVersion; @@ -1395,6 +1426,7 @@ private: SessionCtx->Query().PrepareOnly = true; SessionCtx->Query().SuppressDdlChecks = true; SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + SessionCtx->Query().PreparingQuery->SetText(script.Text); SessionCtx->Query().PreparedQuery.reset(); TMaybe<TSqlVersion> sqlVersion; @@ -1420,6 +1452,7 @@ private: SessionCtx->Query().PrepareOnly = true; SessionCtx->Query().SuppressDdlChecks = true; SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + SessionCtx->Query().PreparingQuery->SetText(script.Text); TMaybe<TSqlVersion> sqlVersion; auto scriptExpr = CompileYqlQuery(script, true, true, ctx, sqlVersion, {}); @@ -1492,7 +1525,9 @@ private: TypesCtx->AddDataSource(providerNames, kikimrDataSource); TypesCtx->AddDataSink(providerNames, kikimrDataSink); - if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) { + bool addExternalDataSources = queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query + || queryType == EKikimrQueryType::YqlScript && AppData()->FeatureFlags.GetEnableExternalDataSources(); + if (addExternalDataSources && FederatedQuerySetup) { InitS3Provider(queryType); InitGenericProvider(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index e84b9a2691..30c06b8e73 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -75,7 +75,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) { std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false); return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), - server.GetRuntime()->GetNodeId(0), counters); + server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig.GetQueryServiceConfig()); } void TestListPathCommon(TIntrusivePtr<IKikimrGateway> gateway) { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index cef3ffa7a3..537aef1ac2 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1418,7 +1418,7 @@ private: auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, MetadataProviderConfig); + IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration(), pgWire); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 3fd7c1e63f..8e770e7c59 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -153,6 +153,7 @@ public: std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) : Owner(owner) , SessionId(sessionId) @@ -164,6 +165,7 @@ public: , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) + , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) { RequestCounters = MakeIntrusive<TKqpRequestCounters>(); @@ -225,7 +227,7 @@ public: void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { if (!WorkerId) { std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, - FederatedQuerySetup, ModuleResolverState, Counters, MetadataProviderConfig)); + FederatedQuerySetup, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig)); WorkerId = RegisterWithSameMailbox(workerActor.release()); } TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie, @@ -2208,6 +2210,7 @@ private: TKqpTempTablesState TempTablesState; + NKikimrConfig::TQueryServiceConfig QueryServiceConfig; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; std::shared_ptr<std::atomic<bool>> CompilationCookie; }; @@ -2219,11 +2222,12 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) { return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, std::move(asyncIoFactory), std::move(moduleResolverState), counters, - metadataProviderConfig + queryServiceConfig, metadataProviderConfig ); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 463ca7dd9b..77920f8164 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -37,6 +37,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig ); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index d84f9883b4..0bffab8dba 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -98,7 +98,7 @@ public: TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + const TQueryServiceConfig& queryServiceConfig, const TMetadataProviderConfig& metadataProviderConfig ) : Owner(owner) , SessionId(sessionId) @@ -107,6 +107,7 @@ public: , ModuleResolverState(moduleResolverState) , Counters(counters) , Config(MakeIntrusive<TKikimrConfiguration>()) + , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , CreationTime(TInstant::Now()) , QueryId(0) @@ -181,7 +182,7 @@ public: std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader), - ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters); + ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig); Config->FeatureFlags = AppData(ctx)->FeatureFlags; @@ -1080,7 +1081,8 @@ private: TIntrusivePtr<TKqpCounters> Counters; TIntrusivePtr<TKqpRequestCounters> RequestCounters; TKikimrConfiguration::TPtr Config; - NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; + TQueryServiceConfig QueryServiceConfig; + TMetadataProviderConfig MetadataProviderConfig; TInstant CreationTime; TIntrusivePtr<IKqpGateway> Gateway; TIntrusivePtr<IKqpHost> KqpHost; @@ -1096,11 +1098,11 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + const TQueryServiceConfig& queryServiceConfig, const TMetadataProviderConfig& metadataProviderConfig ) { return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, - moduleResolverState, counters, metadataProviderConfig); + moduleResolverState, counters, queryServiceConfig, metadataProviderConfig); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h index b3fe27dbb3..3e7538c27e 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.h +++ b/ydb/core/kqp/session_actor/kqp_worker_common.h @@ -148,6 +148,7 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig ); diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index c0da673945..ba424a41ed 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -4,6 +4,7 @@ #include <ydb/core/kqp/ut/federated_query/common/common.h> #include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> @@ -1275,6 +1276,71 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); } + + TString CreateSimpleGenericQuery(std::shared_ptr<TKikimrRunner> kikimr, const TString& bucket) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString object = "test_object"; + const TString content = "key\n1"; + + CreateBucketWithObject(bucket, object, content); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + return fmt::format(R"( + SELECT * FROM `{external_source}`.`/` WITH ( + FORMAT="csv_with_names", + SCHEMA ( + key Int NOT NULL + ) + ) + )", "external_source"_a=externalDataSourceName); + } + + Y_UNIT_TEST(ExecuteScriptWithGenericAutoDetection) { + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_execute_generic_auto_detection"); + + auto driver = kikimr->GetDriver(); + NScripting::TScriptingClient yqlScriptClient(driver); + + auto scriptResult = yqlScriptClient.ExecuteYqlScript(sql).GetValueSync(); + UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); + + TResultSetParser resultSet(scriptResult.GetResultSet(0)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("key").GetInt32(), 1); + } + + Y_UNIT_TEST(ExplainScriptWithGenericAutoDetection) { + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_explain_generic_auto_detection"); + + auto driver = kikimr->GetDriver(); + NScripting::TScriptingClient yqlScriptClient(driver); + + NScripting::TExplainYqlRequestSettings settings; + settings.Mode(NScripting::ExplainYqlRequestMode::Plan); + + auto scriptResult = yqlScriptClient.ExplainYqlScript(sql, settings).GetValueSync(); + UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); + UNIT_ASSERT(scriptResult.GetPlan()); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 8b97466dfb..7430cc5bd6 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -36,7 +36,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) { counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false); return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), - server.GetRuntime()->GetNodeId(0), counters); + server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig.GetQueryServiceConfig()); } TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> gateway, |