diff options
author | spuchin <spuchin@ydb.tech> | 2023-04-27 14:45:30 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-04-27 14:45:30 +0300 |
commit | f5b51eeaedd79fd606dea2d586486613cb9f49b5 (patch) | |
tree | 2c6dee1f019a5f9e22d8646d40b1767ba15cf035 | |
parent | 4109a099c3cabf7603b30db9a86a5c787d72a4c2 (diff) | |
download | ydb-f5b51eeaedd79fd606dea2d586486613cb9f49b5.tar.gz |
Support multiple resulta in QueryService. ()
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 58 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 25 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_result_channel.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_result_channel.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_prepared_query.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_prepared_query.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 34 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp | 10 |
14 files changed, 119 insertions, 70 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 3be6197b78b..0bc431d13ca 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -20,9 +20,14 @@ using namespace NActors; using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest, Ydb::Query::ExecuteQueryResponsePart>; -class RpcFlowControlState { +struct TProducerState { + TMaybe<ui64> LastSeqNo; + ui64 AckedFreeSpaceBytes = 0; +}; + +class TRpcFlowControlState { public: - RpcFlowControlState(ui64 inflightLimitBytes) + TRpcFlowControlState(ui64 inflightLimitBytes) : InflightLimitBytes_(inflightLimitBytes) {} void PushResponse(ui64 responseSizeBytes) { @@ -150,41 +155,46 @@ private: } ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); - if (ResumeWithSeqNo_ && freeSpaceBytes > 0) { - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " - << ", seqNo: " << *ResumeWithSeqNo_ - << ", freeSpace: " << freeSpaceBytes - << ", executer: " << ExecuterActorId_); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(*ResumeWithSeqNo_); - resp->Record.SetFreeSpace(freeSpaceBytes); + for (auto& pair : StreamProducers_) { + const auto& producerId = pair.first; + auto& producer = pair.second; + + if (freeSpaceBytes > 0 && producer.LastSeqNo && producer.AckedFreeSpaceBytes == 0) { + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " + << ", producer: " << producerId + << ", seqNo: " << producer.LastSeqNo + << ", freeSpace: " << freeSpaceBytes); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(*producer.LastSeqNo); + resp->Record.SetFreeSpace(freeSpaceBytes); - ctx.Send(ExecuterActorId_, resp.Release()); + ctx.Send(producerId, resp.Release()); - ResumeWithSeqNo_.Clear(); + producer.AckedFreeSpaceBytes = freeSpaceBytes; + } } + } void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { - if (!ExecuterActorId_) { - ExecuterActorId_ = ev->Sender; - } - Ydb::Query::ExecuteQueryResponsePart response; response.set_status(Ydb::StatusIds::SUCCESS); - response.set_result_set_index(0); + response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex()); response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + FlowControl_.PushResponse(out.size()); + auto freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS); - auto freeSpaceBytes = FlowControl_.FreeSpaceBytes(); - if (freeSpaceBytes == 0) { - ResumeWithSeqNo_ = ev->Get()->Record.GetSeqNo(); - } + auto& producer = StreamProducers_[ev->Sender]; + producer.LastSeqNo = ev->Get()->Record.GetSeqNo(); + producer.AckedFreeSpaceBytes = freeSpaceBytes; LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() @@ -272,10 +282,8 @@ private: private: std::unique_ptr<TEvExecuteQueryRequest> Request_; - RpcFlowControlState FlowControl_; - TMaybe<ui64> ResumeWithSeqNo_; - - TActorId ExecuterActorId_; + TRpcFlowControlState FlowControl_; + TMap<TActorId, TProducerState> StreamProducers_; }; } // namespace diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 53345ddf3b8..b0f517dab62 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -17,9 +17,14 @@ using namespace NYql; void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPtr& tx) { TxHolders.push_back(tx); - TxResults.reserve(TxResults.size() + tx->GetTxResultsMeta().size()); - for (const auto& txResult : tx->GetTxResultsMeta()) { - TxResults.emplace_back(txResult.IsStream, txResult.MkqlItemType, &txResult.ColumnOrder); + TxResults.reserve(TxResults.size() + tx->ResultsSize()); + + for (ui32 i = 0; i < tx->ResultsSize(); ++i) { + const auto& result = tx->GetResults(i); + const auto& resultMeta = tx->GetTxResultsMeta()[i]; + + TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder, + result.GetQueryResultIndex()); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index c78eb86c075..554ccd20458 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -960,24 +960,19 @@ protected: } IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) { - IActor* proxy; + auto channelIt = ResultChannelProxies.find(channel.Id); + if (channelIt != ResultChannelProxies.end()) { + return channelIt->second; + } - if (ResponseEv->TxResults[0].IsStream) { - if (!ResultChannelProxies.empty()) { - return ResultChannelProxies.begin()->second; - } + YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); + const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex]; - proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType, - ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), this->SelfId()); + IActor* proxy; + if (txResult.IsStream) { + proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType, + txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats.get(), this->SelfId()); } else { - YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); - - auto channelIt = ResultChannelProxies.find(channel.Id); - - if (channelIt != ResultChannelProxies.end()) { - return channelIt->second; - } - proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(), channel.DstInputIndex, ResponseEv.get()); } diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index 3d83981ca7e..45e191a220d 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -120,11 +120,12 @@ private: class TResultStreamChannelProxy : public TResultCommonChannelProxy { public: TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, + const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats, TActorId executer) : TResultCommonChannelProxy(txId, channelId, stats, executer) , ColumnOrder(columnOrder) , ItemType(itemType) + , QueryResultIndex(queryResultIndex) , Target(target) {} private: @@ -138,6 +139,7 @@ private: auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>(); streamEv->Record.SetSeqNo(computeData.GetSeqNo()); + streamEv->Record.SetQueryResultIndex(QueryResultIndex); streamEv->Record.MutableResultSet()->Swap(&resultSet); LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, @@ -149,6 +151,7 @@ private: private: const TVector<ui32>* ColumnOrder; NKikimr::NMiniKQL::TType* ItemType; + ui32 QueryResultIndex = 0; const NActors::TActorId Target; }; @@ -184,14 +187,16 @@ private: } // anonymous namespace end NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, TActorId executer) + const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats, + TActorId executer) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "CreateResultStreamChannelProxy: TxId: " << txId << ", channelId: " << channelId ); - return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, target, stats, executer); + return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target, + stats, executer); } NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h index 5cc8c54fb00..9589b1b267e 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.h +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h @@ -26,7 +26,7 @@ struct TQueryExecutionStats; struct TKqpExecuterTxResult; NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, NActors::TActorId target, TQueryExecutionStats* stats, + const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, TQueryExecutionStats* stats, NActors::TActorId executer); NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 6483d705600..02c4378e059 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -185,13 +185,9 @@ public: YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query)); TKiDataQueryBlocks dataQueryBlocks(query); + YQL_ENSURE(dataQueryBlocks.ArgCount() == 1); const auto& queryBlock = dataQueryBlocks.Arg(0); - if (queryBlock.Results().Size() != 1) { - ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, - "Multiple result sets not yet supported.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } if (queryBlock.Effects().ArgCount() > 0) { ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Data modifications not yet supported.")); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 969fa631a49..edb8445e1d3 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -485,21 +485,25 @@ public: CompileTransaction(tx, *queryProto.AddTransactions(), ctx); } - for (const auto& result : query.Results()) { + for (ui32 i = 0; i < query.Results().Size(); ++i) { + const auto& result = query.Results().Item(i); + YQL_ENSURE(result.Maybe<TKqpTxResultBinding>()); auto binding = result.Cast<TKqpTxResultBinding>(); - auto txIndex = FromString<ui32>(binding.TxIndex().Value()); - auto resultIndex = FromString<ui32>(binding.ResultIndex()); + auto txResultIndex = FromString<ui32>(binding.ResultIndex()); YQL_ENSURE(txIndex < queryProto.TransactionsSize()); - YQL_ENSURE(resultIndex < queryProto.GetTransactions(txIndex).ResultsSize()); - YQL_ENSURE(queryProto.GetTransactions(txIndex).GetResults(resultIndex).GetIsStream()); + YQL_ENSURE(txResultIndex < queryProto.GetTransactions(txIndex).ResultsSize()); + auto& txResult = *queryProto.MutableTransactions(txIndex)->MutableResults(txResultIndex); + + YQL_ENSURE(txResult.GetIsStream()); + txResult.SetQueryResultIndex(i); auto& queryBindingProto = *queryProto.AddResultBindings(); auto& txBindingProto = *queryBindingProto.MutableTxResultBinding(); txBindingProto.SetTxIndex(txIndex); - txBindingProto.SetResultIndex(resultIndex); + txBindingProto.SetResultIndex(txResultIndex); } return true; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 1366f93cc8f..a0ac81cea07 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -68,10 +68,10 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar } } - ui32 i = 0; - for (const auto& txResult : Proto->GetResults()) { - auto& result = TxResultsMeta[i++]; - result.IsStream = txResult.GetIsStream(); + for (ui32 i = 0; i < Proto->ResultsSize(); ++i) { + const auto& txResult = Proto->GetResults(i); + auto& result = TxResultsMeta[i]; + result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), Alloc->TypeEnv); if (txResult.ColumnHintsSize() > 0) { result.ColumnOrder.reserve(txResult.GetColumnHints().size()); diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index e4b71443ee6..f1a01bc091a 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -26,7 +26,6 @@ namespace NKikimr::NKqp { class TPreparedQueryAllocHolder; struct TPhyTxResultMetadata { - bool IsStream = false; NKikimr::NMiniKQL::TType* MkqlItemType; TVector<ui32> ColumnOrder; }; diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 05e4e669fd8..669d77eccbc 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -59,15 +59,18 @@ struct TKqpExecuterTxResult { bool IsStream = true; NKikimr::NMiniKQL::TType* MkqlItemType; const TVector<ui32>* ColumnOrder = nullptr; + ui32 QueryResultIndex = 0; NKikimr::NMiniKQL::TUnboxedValueVector Rows; explicit TKqpExecuterTxResult( bool isStream, NKikimr::NMiniKQL::TType* mkqlItemType, - const TVector<ui32>* сolumnOrder = nullptr) + const TVector<ui32>* сolumnOrder, + ui32 queryResultIndex) : IsStream(isStream) , MkqlItemType(mkqlItemType) , ColumnOrder(сolumnOrder) + , QueryResultIndex(queryResultIndex) {} TTypedUnboxedValue GetUV(const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 011dfbac23a..033316a40ac 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -98,6 +98,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST(ExecuteQueryMultiResult) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([ + [[1];[202u];["Value2"]]; + [[1];[502u];["Value2"]]; + [[1];[802u];["Value2"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["One"];[-1]]; + [[2u];["Two"];[0]]; + [[3u];["Three"];[1]]])", FormatResultSetYson(result.GetResultSet(1))); + } + + Y_UNIT_TEST(ExecuteQueryMultiScalar) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT COUNT(*) FROM EightShard; + SELECT COUNT(*) FROM TwoShard; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(1))); + } + Y_UNIT_TEST(ExecuteScript) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 11fa10f60c7..d0fbc4077ec 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -380,6 +380,7 @@ message TEvExecuterTxResponse { message TEvExecuterStreamData { optional Ydb.ResultSet ResultSet = 1; optional uint64 SeqNo = 2; + optional uint32 QueryResultIndex = 3; }; message TEvExecuterStreamDataAck { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 84f4e2cbdc7..1d1ba2fe3b5 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -319,6 +319,7 @@ message TKqpPhyResult { NKikimrMiniKQL.TType ItemType = 2; bool IsStream = 3; repeated string ColumnHints = 4; + uint32 QueryResultIndex = 5; } message TKqpPhyTx { diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp index 00bba6c1201..202bb329bdd 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp @@ -124,17 +124,15 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { } if (part.HasResultSet()) { - // TODO: Support multi-results - Y_ENSURE(part.GetResultSetIndex() == 0); - auto inRs = part.ExtractResultSet(); auto& inRsProto = TProtoAccessor::GetProto(inRs); - if (self->ResultSets_.empty()) { - self->ResultSets_.resize(1); + // TODO: Use result sets metadata + if (self->ResultSets_.size() <= part.GetResultSetIndex()) { + self->ResultSets_.resize(part.GetResultSetIndex() + 1); } - auto& resultSet = self->ResultSets_[0]; + auto& resultSet = self->ResultSets_[part.GetResultSetIndex()]; if (resultSet.columns().empty()) { resultSet.mutable_columns()->CopyFrom(inRsProto.columns()); } |