diff options
author | Hor911 <hor911@ydb.tech> | 2024-11-21 11:45:16 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-21 11:45:16 +0300 |
commit | 1b00f6afcb58567aa0a874f854813fcf252cf2e9 (patch) | |
tree | 20874c3e206655983b68d865d0d6d9e220daff01 | |
parent | 2c9f3be19551a075654830441d196a20c7e5a3fd (diff) | |
download | ydb-1b00f6afcb58567aa0a874f854813fcf252cf2e9.tar.gz |
Sync EvaluteExpr execution (#11801)
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 180 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 47 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_service_ut.cpp | 2 |
6 files changed, 135 insertions, 105 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index cd62d9b3c2..4895bf29a9 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -201,6 +201,9 @@ public: using NYql::IKikimrGateway::ExecuteLiteral; virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) = 0; + using NYql::IKikimrGateway::ExecuteLiteralInstant; + virtual TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, + TQueryData::TPtr params, ui32 txIndex) = 0; /* Scripting */ virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index f70e922900..ebffe7a298 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult { std::shared_ptr<const NKikimrConfig::TAppConfig> Config; }; +bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) { + for (const auto& tx : request.Transactions) { + if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) { + return false; + } + + for (const auto& stage : tx.Body->GetStages()) { + if (stage.InputsSize() != 0) { + return false; + } + } + } + + return true; +} + +void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) { + literalRequest.NeedTxId = false; + literalRequest.MaxAffectedShards = 0; + literalRequest.TotalReadSizeLimitBytes = 0; + literalRequest.MkqlMemoryLimit = 100_MB; + + auto& transaction = *phyQuery.AddTransactions(); + transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE); + + auto& stage = *transaction.AddStages(); + auto& stageProgram = *stage.MutableProgram(); + stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0); + stageProgram.SetRaw(program); + stage.SetOutputsCount(1); + + auto& taskResult = *transaction.AddResults(); + *taskResult.MutableItemType() = resultType; + auto& taskConnection = *taskResult.MutableConnection(); + taskConnection.SetStageIndex(0); +} + +void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) { + if (result.Success()) { + YQL_ENSURE(result.Results.size() == 1); + literalResult.SetSuccess(); + literalResult.Result = result.Results[0]; + } else { + literalResult.SetStatus(result.Status()); + literalResult.AddIssues(result.Issues()); + } +} + +void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) { + auto& response = *ev->Record.MutableResponse(); + if (response.GetStatus() == Ydb::StatusIds::SUCCESS) { + result.SetSuccess(); + result.ExecuterResult.Swap(response.MutableResult()); + { + auto g = params->TypeEnv().BindAllocator(); + + auto& txResults = ev->GetTxResults(); + result.Results.reserve(txResults.size()); + for(auto& tx : txResults) { + result.Results.emplace_back(tx.GetMkql()); + } + params->AddTxHolders(std::move(ev->GetTxHolders())); + + if (!txResults.empty()) { + params->AddTxResults(txIndex, std::move(txResults)); + } + } + } else { + for (auto& issue : response.GetIssues()) { + result.AddIssue(NYql::IssueFromMessage(issue)); + } + } +} template<typename TRequest, typename TResponse, typename TResult> class TProxyRequestHandler: public TRequestHandlerBase< @@ -595,32 +668,8 @@ private: } void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) { - auto* response = ev->Record.MutableResponse(); - TResult result; - if (response->GetStatus() == Ydb::StatusIds::SUCCESS) { - result.SetSuccess(); - result.ExecuterResult.Swap(response->MutableResult()); - { - auto g = Parameters->TypeEnv().BindAllocator(); - - auto& txResults = ev->GetTxResults(); - result.Results.reserve(txResults.size()); - for(auto& tx : txResults) { - result.Results.emplace_back(tx.GetMkql()); - } - Parameters->AddTxHolders(std::move(ev->GetTxHolders())); - - if (!txResults.empty()) { - Parameters->AddTxResults(TxIndex, std::move(txResults)); - } - } - } else { - for (auto& issue : response->GetIssues()) { - result.AddIssue(NYql::IssueFromMessage(issue)); - } - } - + FillPhysicalResult(ev, result, Parameters, TxIndex); Promise.SetValue(std::move(result)); this->PassAway(); } @@ -1785,79 +1834,60 @@ public: auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); auto& phyQuery = *preparedQuery->MutablePhysicalQuery(); NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc); - - literalRequest.NeedTxId = false; - literalRequest.MaxAffectedShards = 0; - literalRequest.TotalReadSizeLimitBytes = 0; - literalRequest.MkqlMemoryLimit = 100_MB; - - auto& transaction = *phyQuery.AddTransactions(); - transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE); - - auto& stage = *transaction.AddStages(); - auto& stageProgram = *stage.MutableProgram(); - stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0); - stageProgram.SetRaw(program); - stage.SetOutputsCount(1); - - auto& taskResult = *transaction.AddResults(); - *taskResult.MutableItemType() = resultType; - auto& taskConnection = *taskResult.MutableConnection(); - taskConnection.SetStageIndex(0); + PrepareLiteralRequest(literalRequest, phyQuery, program, resultType); NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry()); - NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc); - literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params); return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) { const auto& result = future.GetValue(); - TExecuteLiteralResult literalResult; - - if (result.Success()) { - YQL_ENSURE(result.Results.size() == 1); - literalResult.SetSuccess(); - literalResult.Result = result.Results[0]; - } else { - literalResult.SetStatus(result.Status()); - literalResult.AddIssues(result.Issues()); - } - + FillLiteralResult(result, literalResult); return literalResult; }); } + TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override { + auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + auto& phyQuery = *preparedQuery->MutablePhysicalQuery(); + NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc); + PrepareLiteralRequest(literalRequest, phyQuery, program, resultType); + + NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry()); + NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc); + literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params); + + auto result = ExecuteLiteralInstant(std::move(literalRequest), params, 0); + + TExecuteLiteralResult literalResult; + FillLiteralResult(result, literalResult); + return literalResult; + } TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override { YQL_ENSURE(!request.Transactions.empty()); YQL_ENSURE(request.DataShardLocks.empty()); YQL_ENSURE(!request.NeedTxId); - - auto containOnlyLiteralStages = [](const auto& request) { - for (const auto& tx : request.Transactions) { - if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) { - return false; - } - - for (const auto& stage : tx.Body->GetStages()) { - if (stage.InputsSize() != 0) { - return false; - } - } - } - - return true; - }; - - YQL_ENSURE(containOnlyLiteralStages(request)); + YQL_ENSURE(ContainOnlyLiteralStages(request)); auto promise = NewPromise<TExecPhysicalResult>(); IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex); RegisterActor(requestHandler); return promise.GetFuture(); } + TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override { + YQL_ENSURE(!request.Transactions.empty()); + YQL_ENSURE(request.DataShardLocks.empty()); + YQL_ENSURE(!request.NeedTxId); + YQL_ENSURE(ContainOnlyLiteralStages(request)); + + auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>()); + TExecPhysicalResult result; + FillPhysicalResult(ev, result, params, txIndex); + return result; + } + TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override { diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 1e68c0dc47..c11b439ee6 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -2442,6 +2442,12 @@ public: return Gateway->ExecuteLiteral(program, resultType, txAlloc); } + TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, + const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override + { + return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc); + } + private: bool IsPrepare() const { if (!SessionCtx) { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 32797710cd..cf1358a00a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -923,39 +923,30 @@ public: if (status.Level != TStatus::Ok) { return SyncStatus(status); } - auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState()); - return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply( - [this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) { - return TAsyncTransformCallback( - [future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState()); - const auto& literalResult = future.GetValueSync(); - - if (!literalResult.Success()) { - for (const auto& issue : literalResult.Issues()) { - ctx.AddError(issue); - } - input->SetState(TExprNode::EState::Error); - return IGraphTransformer::TStatus::Error; - } + if (!literalResult.Success()) { + for (const auto& issue : literalResult.Issues()) { + ctx.AddError(issue); + } + input->SetState(TExprNode::EState::Error); + return SyncError(); + } - bool truncated = false; - auto yson = this->EncodeResultToYson(literalResult.Result, truncated); - if (truncated) { - input->SetState(TExprNode::EState::Error); - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated")); - return IGraphTransformer::TStatus::Error; - } + bool truncated = false; + auto yson = EncodeResultToYson(literalResult.Result, truncated); + if (truncated) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated")); + input->SetState(TExprNode::EState::Error); + return SyncError(); + } - output = input; - input->SetState(TExprNode::EState::ExecutionComplete); - input->SetResult(ctx.NewAtom(input->Pos(), yson)); - return IGraphTransformer::TStatus::Ok; - }); - })); + output = input; + input->SetState(TExprNode::EState::ExecutionComplete); + input->SetResult(ctx.NewAtom(input->Pos(), yson)); + return SyncOk(); } - if (input->Content() == ConfigureName) { auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index de29871b07..c487f12839 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1126,6 +1126,8 @@ public: virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0; + virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0; + public: using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>; diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 7b3345fe58..18333d025b 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -67,8 +67,6 @@ Y_UNIT_TEST_SUITE(KqpService) { } Y_UNIT_TEST(CloseSessionsWithLoad) { - UNIT_FAIL("Fast fail to avoid 10 min time waste, https://github.com/ydb-platform/ydb/issues/5349"); - auto kikimr = std::make_shared<TKikimrRunner>(); kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); |