aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@ydb.tech>2023-12-07 13:57:11 +0300
committergrigoriypisar <grigoriypisar@ydb.tech>2023-12-07 14:32:33 +0300
commitbb46cc0930549897180c81654d7dca4e40a93908 (patch)
treefa84c55792f56d328df275c4166d9069f402f851
parentd014e0f31449a0b13f21906969251d9624c3a62f (diff)
downloadydb-bb46cc0930549897180c81654d7dca4e40a93908.tar.gz
generic query auto detection
Added generic query detection
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp3
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h8
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp168
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp39
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp12
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.h1
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp66
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp2
13 files changed, 299 insertions, 21 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 513fa9501f..fb5982549a 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -63,6 +63,7 @@ public:
, UserToken(userToken)
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
+ , QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, CompilationTimeout(TDuration::MilliSeconds(tableServiceConfig.GetCompileTimeoutMs()))
, UserRequestContext(userRequestContext)
@@ -80,8 +81,8 @@ public:
ApplyServiceConfig(*Config, tableServiceConfig);
- if (QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT) {
- ui32 scriptResultRowsLimit = queryServiceConfig.GetScriptResultRowsLimit();
+ if (QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT || QueryId.Settings.QueryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
+ ui32 scriptResultRowsLimit = QueryServiceConfig.GetScriptResultRowsLimit();
if (scriptResultRowsLimit > 0) {
Config->_ResultRowsLimit = scriptResultRowsLimit;
} else {
@@ -177,7 +178,7 @@ private:
std::make_shared<TKqpTableMetadataLoader>(
TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
- ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters);
+ ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
@@ -455,6 +456,7 @@ private:
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TKqpDbCountersPtr DbCounters;
TKikimrConfiguration::TPtr Config;
+ TQueryServiceConfig QueryServiceConfig;
TMetadataProviderConfig MetadataProviderConfig;
TDuration CompilationTimeout;
TInstant StartTime;
diff --git a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
index f73266006d..7748e7411a 100644
--- a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
+++ b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
@@ -42,7 +42,8 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) {
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem();
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(),
- actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0), TAlignedPagePoolCounters());
+ actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0),
+ TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig.GetQueryServiceConfig());
}
[[maybe_unused]]
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 68f69929b0..3e4477bb7e 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -195,11 +195,17 @@ public:
virtual NThreading::TFuture<TQueryResult> StreamExecScanQueryAst(const TString& cluster, const TString& query,
TQueryData::TPtr, const TAstQuerySettings& settings, const NActors::TActorId& target,
std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0;
+
+ virtual NThreading::TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query,
+ TQueryData::TPtr params, const TAstQuerySettings& settings,
+ const Ydb::Table::TransactionSettings& txSettings) = 0;
+
+ virtual NThreading::TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) = 0;
};
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem,
- ui32 nodeId, TKqpRequestCounters::TPtr counters);
+ ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig());
bool SplitTablePath(const TString& tableName, const TString& database, std::pair<TString, TString>& pathPair,
TString& error, bool createDir);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index a6003e43de..bcf3a2023d 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -427,6 +427,107 @@ private:
TVector<NYql::NDqProto::TDqExecutionStats> Executions;
};
+class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
+ TKqpGenericQueryRequestHandler,
+ TEvKqp::TEvQueryRequest,
+ TEvKqp::TEvQueryResponse,
+ IKqpGateway::TQueryResult>
+{
+ struct TResultSetDescription {
+ Ydb::ResultSet ResultSet;
+ ui64 RowCount = 0;
+ ui64 ByteCount = 0;
+ bool Initialized = false;
+ };
+
+public:
+ using TRequest = TEvKqp::TEvQueryRequest;
+ using TResponse = TEvKqp::TEvQueryResponse;
+ using TResult = IKqpGateway::TQueryResult;
+
+ using TBase = TKqpGenericQueryRequestHandler::TBase;
+ using TCallbackFunc = TBase::TCallbackFunc;
+
+ TKqpGenericQueryRequestHandler(TRequest* request, ui64 rowsLimit, ui64 sizeLimit, TPromise<TResult> promise, TCallbackFunc callback)
+ : TBase(request, promise, callback)
+ , RowsLimit(rowsLimit)
+ , SizeLimit(sizeLimit)
+ {}
+
+ void Bootstrap() {
+ ActorIdToProto(SelfId(), Request->Record.MutableRequestActorId());
+ Send(MakeKqpProxyID(SelfId().NodeId()), Request.Release());
+ Become(&TKqpGenericQueryRequestHandler::AwaitState);
+ }
+
+ void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+
+ const ui32 resultSetId = record.GetQueryResultIndex();
+ if (resultSetId >= ResultSets.size()) {
+ ResultSets.resize(resultSetId + 1);
+ }
+
+ if (!ResultSets[resultSetId].Initialized) {
+ ResultSets[resultSetId].Initialized = true;
+ for (auto& column : *record.MutableResultSet()->mutable_columns()) {
+ ResultSets[resultSetId].ResultSet.add_columns()->CopyFrom(std::move(column));
+ }
+ }
+
+ if (!ResultSets[resultSetId].ResultSet.truncated()) {
+ for (auto& row : *record.MutableResultSet()->mutable_rows()) {
+ if (RowsLimit && ResultSets[resultSetId].RowCount + 1 > RowsLimit) {
+ ResultSets[resultSetId].ResultSet.set_truncated(true);
+ break;
+ }
+
+ auto serializedSize = row.ByteSizeLong();
+ if (SizeLimit && ResultSets[resultSetId].ByteCount + serializedSize > SizeLimit) {
+ ResultSets[resultSetId].ResultSet.set_truncated(true);
+ break;
+ }
+
+ ResultSets[resultSetId].RowCount++;
+ ResultSets[resultSetId].ByteCount += serializedSize;
+ *ResultSets[resultSetId].ResultSet.add_rows() = std::move(row);
+ }
+ }
+
+ auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max());
+ Send(ev->Sender, response.Release());
+ }
+
+ using TBase::HandleResponse;
+
+ void HandleResponse(TResponse::TPtr &ev, const TActorContext &ctx) {
+ auto& response = *ev->Get()->Record.GetRef().MutableResponse();
+
+ for (auto& resultSet : ResultSets) {
+ ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
+ }
+
+ TBase::HandleResponse(ev, ctx);
+ }
+
+ STFUNC(AwaitState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqpExecuter::TEvStreamData, Handle);
+ HFunc(TEvKqp::TEvQueryResponse, HandleResponse);
+
+ default:
+ TBase::HandleUnexpectedEvent("TKqpGenericQueryRequestHandler", ev->GetTypeRewrite());
+ }
+ }
+
+private:
+ std::vector<TResultSetDescription> ResultSets;
+ const ui64 RowsLimit;
+ const ui64 SizeLimit;
+};
+
class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExecuterRequestHandler> {
public:
using TResult = IKqpGateway::TGenericResult;
@@ -597,14 +698,15 @@ private:
public:
TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader,
- TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters)
+ TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig)
: Cluster(cluster)
, QueryType(queryType)
, Database(database)
, ActorSystem(actorSystem)
, NodeId(nodeId)
, Counters(counters)
- , MetadataLoader(std::move(metadataLoader)) {}
+ , MetadataLoader(std::move(metadataLoader))
+ , QueryServiceConfig(queryServiceConfig) {}
bool HasCluster(const TString& cluster) override {
return cluster == Cluster;
@@ -2000,6 +2102,31 @@ public:
});
}
+ TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
+ const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
+ {
+ YQL_ENSURE(cluster == Cluster);
+
+ auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
+
+ auto& request = *ev->Record.MutableRequest();
+ request.SetCollectStats(settings.CollectStats);
+
+ FillParameters(std::move(params), request.MutableYdbParameters());
+
+ auto& txControl = *request.MutableTxControl();
+ txControl.mutable_begin_tx()->CopyFrom(txSettings);
+ txControl.set_commit_tx(true);
+
+ return RunGenericQuery(query, NKikimrKqp::QUERY_ACTION_EXECUTE, std::move(ev));
+ }
+
+ TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) override {
+ YQL_ENSURE(cluster == Cluster);
+
+ return RunGenericQuery(query, NKikimrKqp::QUERY_ACTION_EXPLAIN, MakeHolder<TEvKqp::TEvQueryRequest>());
+ }
+
private:
using TDescribeSchemeResponse = TEvSchemeShard::TEvDescribeSchemeResult;
using TTransactionResponse = TEvTxUserProxy::TEvProposeTransactionStatus;
@@ -2065,6 +2192,17 @@ private:
return promise.GetFuture();
}
+ TFuture<TQueryResult> SendKqpGenericQueryRequest(TEvKqp::TEvQueryRequest* request, ui64 rowsLimit, ui64 sizeLimit,
+ TKqpGenericQueryRequestHandler::TCallbackFunc callback)
+ {
+ auto promise = NewPromise<TQueryResult>();
+ IActor* requestHandler = new TKqpGenericQueryRequestHandler(request, rowsLimit, sizeLimit,
+ promise, callback);
+ RegisterActor(requestHandler);
+
+ return promise.GetFuture();
+ }
+
template<typename TRequest, typename TResponse, typename TResult>
TFuture<TResult> SendActorRequest(const TActorId& actorId, TRequest* request,
typename TActorRequestHandler<TRequest, TResponse, TResult>::TCallbackFunc callback)
@@ -2101,6 +2239,27 @@ private:
});
}
+ TFuture<TQueryResult> RunGenericQuery(const TString& query, NKikimrKqp::EQueryAction action, THolder<TEvKqp::TEvQueryRequest> ev) {
+ if (UserToken) {
+ ev->Record.SetUserToken(UserToken->GetSerializedToken());
+ }
+
+ auto& request = *ev->Record.MutableRequest();
+ request.SetDatabase(Database);
+ request.SetAction(action);
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ request.SetQuery(query);
+ request.SetKeepSession(false);
+
+ return SendKqpGenericQueryRequest(ev.Release(), QueryServiceConfig.GetScriptResultRowsLimit(), QueryServiceConfig.GetScriptResultSizeLimit(),
+ [] (TPromise<TQueryResult> promise, TEvKqp::TEvQueryResponse&& responseEv) {
+ TQueryResult queryResult;
+ queryResult.ProtobufArenaPtr.reset(new google::protobuf::Arena());
+ KqpResponseToQueryResult(responseEv.Record.GetRef(), queryResult);
+ promise.SetValue(std::move(queryResult));
+ });
+ }
+
bool CheckCluster(const TString& cluster) {
return cluster == Cluster;
}
@@ -2252,16 +2411,17 @@ private:
TAlignedPagePoolCounters AllocCounters;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
std::shared_ptr<IKqpTableMetadataLoader> MetadataLoader;
+ NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
};
} // namespace
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem,
- ui32 nodeId, TKqpRequestCounters::TPtr counters)
+ ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig)
{
return MakeIntrusive<TKikimrIcGateway>(cluster, queryType, database, std::move(metadataLoader), actorSystem, nodeId,
- counters);
+ counters, queryServiceConfig);
}
bool SplitTablePath(const TString& tableName, const TString& database, std::pair<TString, TString>& pathPair,
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 451f4a5bce..e25229efdd 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -16,7 +16,9 @@
#include <ydb/library/yql/providers/common/codec/yql_codec.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
@@ -715,6 +717,7 @@ public:
auto queryAstStr = SerializeExpr(ctx, *query);
+ bool useGenericQuery = ShouldUseGenericQuery(dataQueryBlocks);
bool useScanQuery = ShouldUseScanQuery(dataQueryBlocks, settings);
IKqpGateway::TAstQuerySettings querySettings;
@@ -723,7 +726,16 @@ public:
TFuture<TQueryResult> future;
switch (queryType) {
case EKikimrQueryType::YqlScript:
- if (useScanQuery) {
+ if (useGenericQuery) {
+ Ydb::Table::TransactionSettings txSettings;
+ txSettings.mutable_serializable_read_write();
+ if (SessionCtx->Query().PrepareOnly) {
+ future = Gateway->ExplainGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText());
+ } else {
+ future = Gateway->ExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
+ querySettings, txSettings);
+ }
+ } else if (useScanQuery) {
ui64 rowsLimit = 0;
if (dataQueryBlocks.ArgCount() && !dataQueryBlocks.Arg(0).Results().Empty()) {
const auto& queryBlock = dataQueryBlocks.Arg(0);
@@ -799,6 +811,24 @@ private:
return result;
}
+ bool ShouldUseGenericQuery(const TKiDataQueryBlocks& queryBlocks) {
+ const auto& queryBlock = queryBlocks.Arg(0);
+
+ bool hasFederatedSorcesOrSinks = false;
+ VisitExpr(queryBlock.Ptr(), [&hasFederatedSorcesOrSinks](const TExprNode::TPtr& exprNode) {
+ auto node = TExprBase(exprNode);
+
+ hasFederatedSorcesOrSinks = hasFederatedSorcesOrSinks
+ || node.Maybe<TS3DataSource>()
+ || node.Maybe<TS3DataSink>()
+ || node.Maybe<TGenDataSource>();
+
+ return !hasFederatedSorcesOrSinks;
+ });
+
+ return hasFederatedSorcesOrSinks;
+ }
+
bool ShouldUseScanQuery(const TKiDataQueryBlocks& queryBlocks, const TExecuteSettings& settings) {
if (settings.UseScanQuery) {
return *settings.UseScanQuery;
@@ -1351,6 +1381,7 @@ private:
SessionCtx->Query().Deadlines = settings.Deadlines;
SessionCtx->Query().StatsMode = settings.StatsMode;
SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ SessionCtx->Query().PreparingQuery->SetText(script.Text);
SessionCtx->Query().PreparedQuery.reset();
TMaybe<TSqlVersion> sqlVersion;
@@ -1395,6 +1426,7 @@ private:
SessionCtx->Query().PrepareOnly = true;
SessionCtx->Query().SuppressDdlChecks = true;
SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ SessionCtx->Query().PreparingQuery->SetText(script.Text);
SessionCtx->Query().PreparedQuery.reset();
TMaybe<TSqlVersion> sqlVersion;
@@ -1420,6 +1452,7 @@ private:
SessionCtx->Query().PrepareOnly = true;
SessionCtx->Query().SuppressDdlChecks = true;
SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ SessionCtx->Query().PreparingQuery->SetText(script.Text);
TMaybe<TSqlVersion> sqlVersion;
auto scriptExpr = CompileYqlQuery(script, true, true, ctx, sqlVersion, {});
@@ -1492,7 +1525,9 @@ private:
TypesCtx->AddDataSource(providerNames, kikimrDataSource);
TypesCtx->AddDataSink(providerNames, kikimrDataSink);
- if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) {
+ bool addExternalDataSources = queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query
+ || queryType == EKikimrQueryType::YqlScript && AppData()->FeatureFlags.GetEnableExternalDataSources();
+ if (addExternalDataSources && FederatedQuerySetup) {
InitS3Provider(queryType);
InitGenericProvider();
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index e84b9a2691..30c06b8e73 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -75,7 +75,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
- server.GetRuntime()->GetNodeId(0), counters);
+ server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig.GetQueryServiceConfig());
}
void TestListPathCommon(TIntrusivePtr<IKikimrGateway> gateway) {
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index cef3ffa7a3..537aef1ac2 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -1418,7 +1418,7 @@ private:
auto config = CreateConfig(KqpSettings, workerSettings);
- IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, MetadataProviderConfig);
+ IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration(), pgWire);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 3fd7c1e63f..8e770e7c59 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -153,6 +153,7 @@ public:
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
: Owner(owner)
, SessionId(sessionId)
@@ -164,6 +165,7 @@ public:
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
, Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
+ , QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
{
RequestCounters = MakeIntrusive<TKqpRequestCounters>();
@@ -225,7 +227,7 @@ public:
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!WorkerId) {
std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
- FederatedQuerySetup, ModuleResolverState, Counters, MetadataProviderConfig));
+ FederatedQuerySetup, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig));
WorkerId = RegisterWithSameMailbox(workerActor.release());
}
TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie,
@@ -2208,6 +2210,7 @@ private:
TKqpTempTablesState TempTablesState;
+ NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
std::shared_ptr<std::atomic<bool>> CompilationCookie;
};
@@ -2219,11 +2222,12 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
{
return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
std::move(asyncIoFactory), std::move(moduleResolverState), counters,
- metadataProviderConfig
+ queryServiceConfig, metadataProviderConfig
);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index 463ca7dd9b..77920f8164 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -37,6 +37,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
);
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index d84f9883b4..0bffab8dba 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -98,7 +98,7 @@ public:
TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ const TQueryServiceConfig& queryServiceConfig, const TMetadataProviderConfig& metadataProviderConfig
)
: Owner(owner)
, SessionId(sessionId)
@@ -107,6 +107,7 @@ public:
, ModuleResolverState(moduleResolverState)
, Counters(counters)
, Config(MakeIntrusive<TKikimrConfiguration>())
+ , QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, CreationTime(TInstant::Now())
, QueryId(0)
@@ -181,7 +182,7 @@ public:
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader),
- ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters);
+ ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig);
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
@@ -1080,7 +1081,8 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<TKqpRequestCounters> RequestCounters;
TKikimrConfiguration::TPtr Config;
- NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
+ TQueryServiceConfig QueryServiceConfig;
+ TMetadataProviderConfig MetadataProviderConfig;
TInstant CreationTime;
TIntrusivePtr<IKqpGateway> Gateway;
TIntrusivePtr<IKqpHost> KqpHost;
@@ -1096,11 +1098,11 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ const TQueryServiceConfig& queryServiceConfig, const TMetadataProviderConfig& metadataProviderConfig
)
{
return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
- moduleResolverState, counters, metadataProviderConfig);
+ moduleResolverState, counters, queryServiceConfig, metadataProviderConfig);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h
index b3fe27dbb3..3e7538c27e 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.h
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.h
@@ -148,6 +148,7 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
);
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index c0da673945..ba424a41ed 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/ut/federated_query/common/common.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
@@ -1275,6 +1276,71 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1);
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2);
}
+
+ TString CreateSimpleGenericQuery(std::shared_ptr<TKikimrRunner> kikimr, const TString& bucket) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString object = "test_object";
+ const TString content = "key\n1";
+
+ CreateBucketWithObject(bucket, object, content);
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "location"_a = GetBucketLocation(bucket)
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ return fmt::format(R"(
+ SELECT * FROM `{external_source}`.`/` WITH (
+ FORMAT="csv_with_names",
+ SCHEMA (
+ key Int NOT NULL
+ )
+ )
+ )", "external_source"_a=externalDataSourceName);
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithGenericAutoDetection) {
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_execute_generic_auto_detection");
+
+ auto driver = kikimr->GetDriver();
+ NScripting::TScriptingClient yqlScriptClient(driver);
+
+ auto scriptResult = yqlScriptClient.ExecuteYqlScript(sql).GetValueSync();
+ UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString());
+
+ TResultSetParser resultSet(scriptResult.GetResultSet(0));
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("key").GetInt32(), 1);
+ }
+
+ Y_UNIT_TEST(ExplainScriptWithGenericAutoDetection) {
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_explain_generic_auto_detection");
+
+ auto driver = kikimr->GetDriver();
+ NScripting::TScriptingClient yqlScriptClient(driver);
+
+ NScripting::TExplainYqlRequestSettings settings;
+ settings.Mode(NScripting::ExplainYqlRequestMode::Plan);
+
+ auto scriptResult = yqlScriptClient.ExplainYqlScript(sql, settings).GetValueSync();
+ UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString());
+ UNIT_ASSERT(scriptResult.GetPlan());
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 8b97466dfb..7430cc5bd6 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -36,7 +36,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false);
return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
- server.GetRuntime()->GetNodeId(0), counters);
+ server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig.GetQueryServiceConfig());
}
TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> gateway,