aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-06-29 14:56:31 +0300
committerspuchin <spuchin@ydb.tech>2023-06-29 14:56:31 +0300
commit31a5ed6d496c974b10620ca78d2e33e117e433e8 (patch)
tree76729a3c0e3031ffd12dbb06df440fc889185dcf
parentee56ba144faaa6e36e9a6f07cebab55aefbba9f7 (diff)
downloadydb-31a5ed6d496c974b10620ca78d2e33e117e433e8.tar.gz
Implement concurrent_result_sets setting for GenericQuery. ()
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp6
-rw-r--r--ydb/core/kqp/common/kqp_timeouts.cpp1
-rw-r--r--ydb/core/kqp/common/simple/helpers.cpp1
-rw-r--r--ydb/core/kqp/common/simple/query_id.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp5
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp2
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_host.h1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp17
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h3
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp4
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp56
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp9
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.h1
18 files changed, 101 insertions, 16 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 6fedd688690..fbc118017ce 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -257,9 +257,13 @@ private:
}
}
+ auto queryType = req->concurrent_result_sets()
+ ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
+ : NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY;
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
queryAction,
- NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY,
+ queryType,
SelfId(),
Request_,
"", // sessionId
diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp
index d5e55838877..7124e119435 100644
--- a/ydb/core/kqp/common/kqp_timeouts.cpp
+++ b/ydb/core/kqp/common/kqp_timeouts.cpp
@@ -16,6 +16,7 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return queryLimits.GetDataQueryTimeoutMs();
diff --git a/ydb/core/kqp/common/simple/helpers.cpp b/ydb/core/kqp/common/simple/helpers.cpp
index 11c06c9a3f1..f42c63fa4ed 100644
--- a/ydb/core/kqp/common/simple/helpers.cpp
+++ b/ydb/core/kqp/common/simple/helpers.cpp
@@ -10,6 +10,7 @@ bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) {
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return true;
diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp
index a05d22e71f4..2403394c5d7 100644
--- a/ydb/core/kqp/common/simple/query_id.cpp
+++ b/ydb/core/kqp/common/simple/query_id.cpp
@@ -25,6 +25,7 @@ TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
break;
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 7b1697daa84..a79f5a096b2 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -141,6 +141,11 @@ public:
break;
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ prepareSettings.ConcurrentResults = false;
+ AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings);
+ break;
+
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings);
break;
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 6d352b049b2..8efd5f0b860 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -105,6 +105,8 @@ void TKqpCountersBase::Init() {
KqpGroup->GetCounter("Request/QueryTypeAstScan", true);
QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY] =
KqpGroup->GetCounter("Request/QueryTypeGenericQuery", true);
+ QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY] =
+ KqpGroup->GetCounter("Request/QueryTypeGenericConcurrentQuery", true);
QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_SCRIPT] =
KqpGroup->GetCounter("Request/QueryTypeGenericScript", true);
OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true);
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index e2a37754bc8..566fa324311 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1329,6 +1329,10 @@ private:
if (settings.IsInternalCall) {
SessionCtx->Query().IsInternalCall = *settings.IsInternalCall;
}
+ if (settings.ConcurrentResults) {
+ YQL_ENSURE(*settings.ConcurrentResults || queryType == EKikimrQueryType::Query);
+ SessionCtx->Query().ConcurrentResults = *settings.ConcurrentResults;
+ }
TMaybe<TSqlVersion> sqlVersion = settings.SyntaxVersion;
if (!sqlVersion) {
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index 799c929bd58..c82aed9c78f 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -33,6 +33,7 @@ public:
TMaybe<bool> IsInternalCall;
TMaybe<bool> UsePgParser;
TMaybe<TSqlVersion> SyntaxVersion;
+ TMaybe<bool> ConcurrentResults;
TString ToString() const {
return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " IsInternalCall: " << IsInternalCall << " }";
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
index 47616fcb3c5..6265d85b0d5 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
@@ -60,7 +60,7 @@ TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<
if (auto maybeDatasink = node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) {
auto cluster = TString(maybeDatasink.Cast().Cluster());
- ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr(), types);
+ ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr(), types, sessionCtx->Query().ConcurrentResults);
if (ret != inputNode) {
return ret;
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 2d3ca7e9be5..c50913c001b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -82,6 +82,8 @@ struct TKiExploreTxResults {
bool HasUncommittedChangesRead = false;
};
+ bool ConcurrentResults = true;
+
THashSet<const TExprNode*> Ops;
TVector<TExprBase> Sync;
TVector<TKiQueryBlock> QueryBlocks;
@@ -245,6 +247,10 @@ struct TKiExploreTxResults {
AddQueryBlock();
}
+ if (!ConcurrentResults && QueryBlocks.back().Results.size() > 0) {
+ AddQueryBlock();
+ }
+
auto& curBlock = QueryBlocks.back();
curBlock.Results.push_back(result);
}
@@ -323,7 +329,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
for (const auto& dataSource : types.DataSources) {
- if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration
+ if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration
&& dqIntegration->CanRead(*node.Ptr(), ctx)
&& dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) {
txRes.Ops.insert(node.Raw());
@@ -718,7 +724,8 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor
} // namespace
-TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) {
+TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData,
+ TTypeAnnotationContext& types, bool concurrentResults) {
if (!node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) {
return node.Ptr();
}
@@ -728,6 +735,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
auto kiDataSink = commit.DataSink().Cast<TKiDataSink>();
TKiExploreTxResults txExplore;
+ txExplore.ConcurrentResults = concurrentResults;
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types)) {
return node.Ptr();
}
@@ -793,13 +801,14 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
.Done()
.Ptr();
+ int resultIndex = 0;
for (auto& block : txExplore.QueryBlocks) {
for (size_t i = 0; i < block.Results.size(); ++i) {
auto result = block.Results[i].Cast<TResWriteBase>();
auto extractValue = Build<TCoNth>(ctx, node.Pos())
.Tuple(execRight)
- .Index().Build(i)
+ .Index().Build(resultIndex)
.Done()
.Ptr();
@@ -818,6 +827,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
.DataSink<TResultDataSink>()
.Build()
.Done();
+
+ ++resultIndex;
}
}
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 488813c6e90..0e09dfa4961 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -105,8 +105,8 @@ struct TKikimrQueryContext : TThrRefBase {
// Operations on document API tables are performed in restricted mode by default,
// full mode can be enabled explicitly.
bool DocumentApiRestricted = true;
-
bool IsInternalCall = false;
+ bool ConcurrentResults = true;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index b742e9927f2..4adb83d704e 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -201,7 +201,8 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat
TVector<NKqpProto::TKqpTableInfo>& infos);
// Optimizer rules
-TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types);
+TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData,
+ TTypeAnnotationContext& types, bool sequentialResults);
TExprNode::TPtr KiBuildResult(NNodes::TExprBase node, const TString& cluster, TExprContext& ctx);
const THashSet<TStringBuf>& KikimrDataSourceFunctions();
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index d086bc3c2d3..5d3cf7502af 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -379,6 +379,7 @@ public:
type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT
);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 0e797f007aa..ad8eef87651 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -285,6 +285,7 @@ public:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return true;
@@ -401,6 +402,7 @@ public:
case NKikimrKqp::QUERY_ACTION_EXPLAIN: {
auto type = QueryState->GetType();
if (type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY &&
+ type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY &&
type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT)
{
return ForwardRequest(ev);
@@ -675,6 +677,7 @@ public:
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT
);
break;
@@ -1188,6 +1191,7 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: {
TString text = QueryState->ExtractQueryText();
if (IsQueryAllowedToLog(text)) {
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index b2d5c2e464e..b001ed94567 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -137,12 +137,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
+ auto settings = TExecuteQuerySettings()
+ .StatsMode(EStatsMode::Basic);
+
auto result = db.ExecuteQuery(R"(
SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+
CompareYson(R"([
[[1];[202u];["Value2"]];
[[1];[502u];["Value2"]];
@@ -167,6 +173,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(1)));
}
+ Y_UNIT_TEST(StreamExecuteQueryMultiResult) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto it = db.StreamExecuteQuery(R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT 2;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 lastResultSetIndex = 0;
+ ui64 count = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ if (streamPart.GetResultSetIndex() != lastResultSetIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(streamPart.GetResultSetIndex(), lastResultSetIndex + 1);
+ ++lastResultSetIndex;
+ }
+
+ auto resultSet = streamPart.ExtractResultSet();
+ count += resultSet.RowsCount();
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 7);
+ }
+
Y_UNIT_TEST(ExecuteQueryWrite) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
@@ -351,7 +391,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", settings).ExtractValueSync();
UNIT_ASSERT_EQUAL(scriptExecutionOperation.Metadata().ExecMode, EExecMode::Explain);
-
+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecMode, EExecMode::Explain);
@@ -372,12 +412,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto scriptExecutionOperation = db.ExecuteScript(R"(
SELECT 42
)", settings).ExtractValueSync();
-
+
// TODO: change when parse mode will be supported
UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus());
UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1);
UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetIssues().back().GetMessage(), "Query mode is not supported yet", scriptExecutionOperation.Status().GetIssues().ToString());
-
+
}
Y_UNIT_TEST(ValidateScript) {
@@ -388,7 +428,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto scriptExecutionOperation = db.ExecuteScript(R"(
SELECT 42
)", settings).ExtractValueSync();
-
+
// TODO: change when validate mode will be supported
UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus());
UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1);
@@ -403,7 +443,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto scriptExecutionOperation = db.ExecuteScript(R"(
SELECT 42
)", settings).ExtractValueSync();
-
+
UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus());
UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1);
UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetIssues().back().GetMessage(), "Query mode is not specified", scriptExecutionOperation.Status().GetIssues().ToString());
@@ -500,7 +540,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
for (const auto& op : list.GetList()) {
UNIT_ASSERT_C(listedOpsAfterForget.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId);
}
-
+
UNIT_ASSERT_EQUAL(rememberedOps, listedOpsAfterForget);
}
@@ -525,7 +565,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS, forgetStatus.GetIssues().ToString());
forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString());
-
+
}
Y_UNIT_TEST(ForgetScriptExecutionRace) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index c8cbec780f1..f52e14fae1d 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -40,6 +40,7 @@ enum EQueryType {
QUERY_TYPE_SQL_GENERIC_QUERY = 12;
QUERY_TYPE_SQL_GENERIC_SCRIPT = 13;
+ QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY = 14;
};
enum EQueryAction {
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
index 8cb5e7e60f1..a511c0ef42a 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
@@ -190,6 +190,10 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
request.mutable_query_content()->set_text(query);
request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_));
+ if (settings.ConcurrentResultSets_) {
+ request.set_concurrent_result_sets(*settings.ConcurrentResultSets_);
+ }
+
if (txControl.HasTx()) {
auto requestTxControl = request.mutable_tx_control();
requestTxControl->set_commit_tx(txControl.CommitTx_);
@@ -256,7 +260,10 @@ TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRp
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
const TMaybe<TParams>& params, const TExecuteQuerySettings& settings)
{
- return StreamExecuteQuery(connections, driverState, query, txControl, params, settings)
+ auto syncSettings = settings;
+ syncSettings.ConcurrentResultSets(true);
+
+ return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings)
.Apply([](TAsyncExecuteQueryIterator itFuture){
auto it = itFuture.ExtractValue();
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
index 51d57f82878..d07a96a94b3 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
@@ -98,6 +98,7 @@ struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> {
FLUENT_SETTING_DEFAULT(ESyntax, Syntax, ESyntax::YqlV1);
FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute);
FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None);
+ FLUENT_SETTING_OPTIONAL(bool, ConcurrentResultSets);
};
class TExecuteQueryResult : public TStatus {