diff options
author | spuchin <spuchin@ydb.tech> | 2023-06-29 14:56:31 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-06-29 14:56:31 +0300 |
commit | 31a5ed6d496c974b10620ca78d2e33e117e433e8 (patch) | |
tree | 76729a3c0e3031ffd12dbb06df440fc889185dcf | |
parent | ee56ba144faaa6e36e9a6f07cebab55aefbba9f7 (diff) | |
download | ydb-31a5ed6d496c974b10620ca78d2e33e117e433e8.tar.gz |
Implement concurrent_result_sets setting for GenericQuery. ()
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_timeouts.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/helpers.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/query_id.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt_build.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 56 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp | 9 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/draft/ydb_query/query.h | 1 |
18 files changed, 101 insertions, 16 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 6fedd688690..fbc118017ce 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -257,9 +257,13 @@ private: } } + auto queryType = req->concurrent_result_sets() + ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY + : NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY; + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( queryAction, - NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, + queryType, SelfId(), Request_, "", // sessionId diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp index d5e55838877..7124e119435 100644 --- a/ydb/core/kqp/common/kqp_timeouts.cpp +++ b/ydb/core/kqp/common/kqp_timeouts.cpp @@ -16,6 +16,7 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return queryLimits.GetDataQueryTimeoutMs(); diff --git a/ydb/core/kqp/common/simple/helpers.cpp b/ydb/core/kqp/common/simple/helpers.cpp index 11c06c9a3f1..f42c63fa4ed 100644 --- a/ydb/core/kqp/common/simple/helpers.cpp +++ b/ydb/core/kqp/common/simple/helpers.cpp @@ -10,6 +10,7 @@ bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) { case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return true; diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp index a05d22e71f4..2403394c5d7 100644 --- a/ydb/core/kqp/common/simple/query_id.cpp +++ b/ydb/core/kqp/common/simple/query_id.cpp @@ -25,6 +25,7 @@ TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: break; diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7b1697daa84..a79f5a096b2 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -141,6 +141,11 @@ public: break; case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + prepareSettings.ConcurrentResults = false; + AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings); + break; + + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings); break; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 6d352b049b2..8efd5f0b860 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -105,6 +105,8 @@ void TKqpCountersBase::Init() { KqpGroup->GetCounter("Request/QueryTypeAstScan", true); QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY] = KqpGroup->GetCounter("Request/QueryTypeGenericQuery", true); + QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY] = + KqpGroup->GetCounter("Request/QueryTypeGenericConcurrentQuery", true); QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_SCRIPT] = KqpGroup->GetCounter("Request/QueryTypeGenericScript", true); OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index e2a37754bc8..566fa324311 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1329,6 +1329,10 @@ private: if (settings.IsInternalCall) { SessionCtx->Query().IsInternalCall = *settings.IsInternalCall; } + if (settings.ConcurrentResults) { + YQL_ENSURE(*settings.ConcurrentResults || queryType == EKikimrQueryType::Query); + SessionCtx->Query().ConcurrentResults = *settings.ConcurrentResults; + } TMaybe<TSqlVersion> sqlVersion = settings.SyntaxVersion; if (!sqlVersion) { diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 799c929bd58..c82aed9c78f 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -33,6 +33,7 @@ public: TMaybe<bool> IsInternalCall; TMaybe<bool> UsePgParser; TMaybe<TSqlVersion> SyntaxVersion; + TMaybe<bool> ConcurrentResults; TString ToString() const { return TStringBuilder() << "TPrepareSettings{ DocumentApiRestricted: " << DocumentApiRestricted << " IsInternalCall: " << IsInternalCall << " }"; diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp index 47616fcb3c5..6265d85b0d5 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp @@ -60,7 +60,7 @@ TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr< if (auto maybeDatasink = node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) { auto cluster = TString(maybeDatasink.Cast().Cluster()); - ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr(), types); + ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr(), types, sessionCtx->Query().ConcurrentResults); if (ret != inputNode) { return ret; diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 2d3ca7e9be5..c50913c001b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -82,6 +82,8 @@ struct TKiExploreTxResults { bool HasUncommittedChangesRead = false; }; + bool ConcurrentResults = true; + THashSet<const TExprNode*> Ops; TVector<TExprBase> Sync; TVector<TKiQueryBlock> QueryBlocks; @@ -245,6 +247,10 @@ struct TKiExploreTxResults { AddQueryBlock(); } + if (!ConcurrentResults && QueryBlocks.back().Results.size() > 0) { + AddQueryBlock(); + } + auto& curBlock = QueryBlocks.back(); curBlock.Results.push_back(result); } @@ -323,7 +329,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } for (const auto& dataSource : types.DataSources) { - if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration + if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration && dqIntegration->CanRead(*node.Ptr(), ctx) && dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) { txRes.Ops.insert(node.Raw()); @@ -718,7 +724,8 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor } // namespace -TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) { +TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, + TTypeAnnotationContext& types, bool concurrentResults) { if (!node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) { return node.Ptr(); } @@ -728,6 +735,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK auto kiDataSink = commit.DataSink().Cast<TKiDataSink>(); TKiExploreTxResults txExplore; + txExplore.ConcurrentResults = concurrentResults; if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types)) { return node.Ptr(); } @@ -793,13 +801,14 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK .Done() .Ptr(); + int resultIndex = 0; for (auto& block : txExplore.QueryBlocks) { for (size_t i = 0; i < block.Results.size(); ++i) { auto result = block.Results[i].Cast<TResWriteBase>(); auto extractValue = Build<TCoNth>(ctx, node.Pos()) .Tuple(execRight) - .Index().Build(i) + .Index().Build(resultIndex) .Done() .Ptr(); @@ -818,6 +827,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK .DataSink<TResultDataSink>() .Build() .Done(); + + ++resultIndex; } } } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 488813c6e90..0e09dfa4961 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -105,8 +105,8 @@ struct TKikimrQueryContext : TThrRefBase { // Operations on document API tables are performed in restricted mode by default, // full mode can be enabled explicitly. bool DocumentApiRestricted = true; - bool IsInternalCall = false; + bool ConcurrentResults = true; std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery; std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index b742e9927f2..4adb83d704e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -201,7 +201,8 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat TVector<NKqpProto::TKqpTableInfo>& infos); // Optimizer rules -TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types); +TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData, + TTypeAnnotationContext& types, bool sequentialResults); TExprNode::TPtr KiBuildResult(NNodes::TExprBase node, const TString& cluster, TExprContext& ctx); const THashSet<TStringBuf>& KikimrDataSourceFunctions(); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index d086bc3c2d3..5d3cf7502af 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -379,6 +379,7 @@ public: type == NKikimrKqp::QUERY_TYPE_AST_SCAN || type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY || + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT ); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 0e797f007aa..ad8eef87651 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -285,6 +285,7 @@ public: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return true; @@ -401,6 +402,7 @@ public: case NKikimrKqp::QUERY_ACTION_EXPLAIN: { auto type = QueryState->GetType(); if (type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY && + type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY && type != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT) { return ForwardRequest(ev); @@ -675,6 +677,7 @@ public: type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN || type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY || + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT ); break; @@ -1188,6 +1191,7 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: { TString text = QueryState->ExtractQueryText(); if (IsQueryAllowedToLog(text)) { diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index b2d5c2e464e..b001ed94567 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -137,12 +137,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); + auto settings = TExecuteQuerySettings() + .StatsMode(EStatsMode::Basic); + auto result = db.ExecuteQuery(R"( SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + CompareYson(R"([ [[1];[202u];["Value2"]]; [[1];[502u];["Value2"]]; @@ -167,6 +173,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(1))); } + Y_UNIT_TEST(StreamExecuteQueryMultiResult) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto it = db.StreamExecuteQuery(R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT 2; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 lastResultSetIndex = 0; + ui64 count = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + if (streamPart.GetResultSetIndex() != lastResultSetIndex) { + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetResultSetIndex(), lastResultSetIndex + 1); + ++lastResultSetIndex; + } + + auto resultSet = streamPart.ExtractResultSet(); + count += resultSet.RowsCount(); + } + } + + UNIT_ASSERT_VALUES_EQUAL(count, 7); + } + Y_UNIT_TEST(ExecuteQueryWrite) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); @@ -351,7 +391,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", settings).ExtractValueSync(); UNIT_ASSERT_EQUAL(scriptExecutionOperation.Metadata().ExecMode, EExecMode::Explain); - + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecMode, EExecMode::Explain); @@ -372,12 +412,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto scriptExecutionOperation = db.ExecuteScript(R"( SELECT 42 )", settings).ExtractValueSync(); - + // TODO: change when parse mode will be supported UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus()); UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1); UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetIssues().back().GetMessage(), "Query mode is not supported yet", scriptExecutionOperation.Status().GetIssues().ToString()); - + } Y_UNIT_TEST(ValidateScript) { @@ -388,7 +428,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto scriptExecutionOperation = db.ExecuteScript(R"( SELECT 42 )", settings).ExtractValueSync(); - + // TODO: change when validate mode will be supported UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus()); UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1); @@ -403,7 +443,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto scriptExecutionOperation = db.ExecuteScript(R"( SELECT 42 )", settings).ExtractValueSync(); - + UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::BAD_REQUEST, scriptExecutionOperation.Status().GetStatus()); UNIT_ASSERT(scriptExecutionOperation.Status().GetIssues().Size() == 1); UNIT_ASSERT_EQUAL_C(scriptExecutionOperation.Status().GetIssues().back().GetMessage(), "Query mode is not specified", scriptExecutionOperation.Status().GetIssues().ToString()); @@ -500,7 +540,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { for (const auto& op : list.GetList()) { UNIT_ASSERT_C(listedOpsAfterForget.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId); } - + UNIT_ASSERT_EQUAL(rememberedOps, listedOpsAfterForget); } @@ -525,7 +565,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS, forgetStatus.GetIssues().ToString()); forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString()); - + } Y_UNIT_TEST(ForgetScriptExecutionRace) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c8cbec780f1..f52e14fae1d 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -40,6 +40,7 @@ enum EQueryType { QUERY_TYPE_SQL_GENERIC_QUERY = 12; QUERY_TYPE_SQL_GENERIC_SCRIPT = 13; + QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY = 14; }; enum EQueryAction { diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp index 8cb5e7e60f1..a511c0ef42a 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp @@ -190,6 +190,10 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm request.mutable_query_content()->set_text(query); request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_)); + if (settings.ConcurrentResultSets_) { + request.set_concurrent_result_sets(*settings.ConcurrentResultSets_); + } + if (txControl.HasTx()) { auto requestTxControl = request.mutable_tx_control(); requestTxControl->set_commit_tx(txControl.CommitTx_); @@ -256,7 +260,10 @@ TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRp const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, const TMaybe<TParams>& params, const TExecuteQuerySettings& settings) { - return StreamExecuteQuery(connections, driverState, query, txControl, params, settings) + auto syncSettings = settings; + syncSettings.ConcurrentResultSets(true); + + return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings) .Apply([](TAsyncExecuteQueryIterator itFuture){ auto it = itFuture.ExtractValue(); diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h index 51d57f82878..d07a96a94b3 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h @@ -98,6 +98,7 @@ struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> { FLUENT_SETTING_DEFAULT(ESyntax, Syntax, ESyntax::YqlV1); FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute); FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None); + FLUENT_SETTING_OPTIONAL(bool, ConcurrentResultSets); }; class TExecuteQueryResult : public TStatus { |