aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2024-11-21 11:45:16 +0300
committerGitHub <noreply@github.com>2024-11-21 11:45:16 +0300
commit1b00f6afcb58567aa0a874f854813fcf252cf2e9 (patch)
tree20874c3e206655983b68d865d0d6d9e220daff01
parent2c9f3be19551a075654830441d196a20c7e5a3fd (diff)
downloadydb-1b00f6afcb58567aa0a874f854813fcf252cf2e9.tar.gz
Sync EvaluteExpr execution (#11801)
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h3
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp180
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp47
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h2
-rw-r--r--ydb/core/kqp/ut/service/kqp_service_ut.cpp2
6 files changed, 135 insertions, 105 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index cd62d9b3c2..4895bf29a9 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -201,6 +201,9 @@ public:
using NYql::IKikimrGateway::ExecuteLiteral;
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
TQueryData::TPtr params, ui32 txIndex) = 0;
+ using NYql::IKikimrGateway::ExecuteLiteralInstant;
+ virtual TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request,
+ TQueryData::TPtr params, ui32 txIndex) = 0;
/* Scripting */
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index f70e922900..ebffe7a298 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult {
std::shared_ptr<const NKikimrConfig::TAppConfig> Config;
};
+bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) {
+ for (const auto& tx : request.Transactions) {
+ if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
+ return false;
+ }
+
+ for (const auto& stage : tx.Body->GetStages()) {
+ if (stage.InputsSize() != 0) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) {
+ literalRequest.NeedTxId = false;
+ literalRequest.MaxAffectedShards = 0;
+ literalRequest.TotalReadSizeLimitBytes = 0;
+ literalRequest.MkqlMemoryLimit = 100_MB;
+
+ auto& transaction = *phyQuery.AddTransactions();
+ transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
+
+ auto& stage = *transaction.AddStages();
+ auto& stageProgram = *stage.MutableProgram();
+ stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
+ stageProgram.SetRaw(program);
+ stage.SetOutputsCount(1);
+
+ auto& taskResult = *transaction.AddResults();
+ *taskResult.MutableItemType() = resultType;
+ auto& taskConnection = *taskResult.MutableConnection();
+ taskConnection.SetStageIndex(0);
+}
+
+void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) {
+ if (result.Success()) {
+ YQL_ENSURE(result.Results.size() == 1);
+ literalResult.SetSuccess();
+ literalResult.Result = result.Results[0];
+ } else {
+ literalResult.SetStatus(result.Status());
+ literalResult.AddIssues(result.Issues());
+ }
+}
+
+void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) {
+ auto& response = *ev->Record.MutableResponse();
+ if (response.GetStatus() == Ydb::StatusIds::SUCCESS) {
+ result.SetSuccess();
+ result.ExecuterResult.Swap(response.MutableResult());
+ {
+ auto g = params->TypeEnv().BindAllocator();
+
+ auto& txResults = ev->GetTxResults();
+ result.Results.reserve(txResults.size());
+ for(auto& tx : txResults) {
+ result.Results.emplace_back(tx.GetMkql());
+ }
+ params->AddTxHolders(std::move(ev->GetTxHolders()));
+
+ if (!txResults.empty()) {
+ params->AddTxResults(txIndex, std::move(txResults));
+ }
+ }
+ } else {
+ for (auto& issue : response.GetIssues()) {
+ result.AddIssue(NYql::IssueFromMessage(issue));
+ }
+ }
+}
template<typename TRequest, typename TResponse, typename TResult>
class TProxyRequestHandler: public TRequestHandlerBase<
@@ -595,32 +668,8 @@ private:
}
void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
- auto* response = ev->Record.MutableResponse();
-
TResult result;
- if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
- result.SetSuccess();
- result.ExecuterResult.Swap(response->MutableResult());
- {
- auto g = Parameters->TypeEnv().BindAllocator();
-
- auto& txResults = ev->GetTxResults();
- result.Results.reserve(txResults.size());
- for(auto& tx : txResults) {
- result.Results.emplace_back(tx.GetMkql());
- }
- Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
-
- if (!txResults.empty()) {
- Parameters->AddTxResults(TxIndex, std::move(txResults));
- }
- }
- } else {
- for (auto& issue : response->GetIssues()) {
- result.AddIssue(NYql::IssueFromMessage(issue));
- }
- }
-
+ FillPhysicalResult(ev, result, Parameters, TxIndex);
Promise.SetValue(std::move(result));
this->PassAway();
}
@@ -1785,79 +1834,60 @@ public:
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
-
- literalRequest.NeedTxId = false;
- literalRequest.MaxAffectedShards = 0;
- literalRequest.TotalReadSizeLimitBytes = 0;
- literalRequest.MkqlMemoryLimit = 100_MB;
-
- auto& transaction = *phyQuery.AddTransactions();
- transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
-
- auto& stage = *transaction.AddStages();
- auto& stageProgram = *stage.MutableProgram();
- stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
- stageProgram.SetRaw(program);
- stage.SetOutputsCount(1);
-
- auto& taskResult = *transaction.AddResults();
- *taskResult.MutableItemType() = resultType;
- auto& taskConnection = *taskResult.MutableConnection();
- taskConnection.SetStageIndex(0);
+ PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
-
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
-
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
const auto& result = future.GetValue();
-
TExecuteLiteralResult literalResult;
-
- if (result.Success()) {
- YQL_ENSURE(result.Results.size() == 1);
- literalResult.SetSuccess();
- literalResult.Result = result.Results[0];
- } else {
- literalResult.SetStatus(result.Status());
- literalResult.AddIssues(result.Issues());
- }
-
+ FillLiteralResult(result, literalResult);
return literalResult;
});
}
+ TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
+ auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
+ NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
+ PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
+
+ NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
+ NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
+ literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
+
+ auto result = ExecuteLiteralInstant(std::move(literalRequest), params, 0);
+
+ TExecuteLiteralResult literalResult;
+ FillLiteralResult(result, literalResult);
+ return literalResult;
+ }
TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
YQL_ENSURE(!request.Transactions.empty());
YQL_ENSURE(request.DataShardLocks.empty());
YQL_ENSURE(!request.NeedTxId);
-
- auto containOnlyLiteralStages = [](const auto& request) {
- for (const auto& tx : request.Transactions) {
- if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
- return false;
- }
-
- for (const auto& stage : tx.Body->GetStages()) {
- if (stage.InputsSize() != 0) {
- return false;
- }
- }
- }
-
- return true;
- };
-
- YQL_ENSURE(containOnlyLiteralStages(request));
+ YQL_ENSURE(ContainOnlyLiteralStages(request));
auto promise = NewPromise<TExecPhysicalResult>();
IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
RegisterActor(requestHandler);
return promise.GetFuture();
}
+ TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
+ YQL_ENSURE(!request.Transactions.empty());
+ YQL_ENSURE(request.DataShardLocks.empty());
+ YQL_ENSURE(!request.NeedTxId);
+ YQL_ENSURE(ContainOnlyLiteralStages(request));
+
+ auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>());
+ TExecPhysicalResult result;
+ FillPhysicalResult(ev, result, params, txIndex);
+ return result;
+ }
+
TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query,
TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override
{
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 1e68c0dc47..c11b439ee6 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -2442,6 +2442,12 @@ public:
return Gateway->ExecuteLiteral(program, resultType, txAlloc);
}
+ TExecuteLiteralResult ExecuteLiteralInstant(const TString& program,
+ const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
+ {
+ return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc);
+ }
+
private:
bool IsPrepare() const {
if (!SessionCtx) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 32797710cd..cf1358a00a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -923,39 +923,30 @@ public:
if (status.Level != TStatus::Ok) {
return SyncStatus(status);
}
- auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
- return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
- [this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
- return TAsyncTransformCallback(
- [future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
- const auto& literalResult = future.GetValueSync();
-
- if (!literalResult.Success()) {
- for (const auto& issue : literalResult.Issues()) {
- ctx.AddError(issue);
- }
- input->SetState(TExprNode::EState::Error);
- return IGraphTransformer::TStatus::Error;
- }
+ if (!literalResult.Success()) {
+ for (const auto& issue : literalResult.Issues()) {
+ ctx.AddError(issue);
+ }
+ input->SetState(TExprNode::EState::Error);
+ return SyncError();
+ }
- bool truncated = false;
- auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
- if (truncated) {
- input->SetState(TExprNode::EState::Error);
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
- return IGraphTransformer::TStatus::Error;
- }
+ bool truncated = false;
+ auto yson = EncodeResultToYson(literalResult.Result, truncated);
+ if (truncated) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
+ input->SetState(TExprNode::EState::Error);
+ return SyncError();
+ }
- output = input;
- input->SetState(TExprNode::EState::ExecutionComplete);
- input->SetResult(ctx.NewAtom(input->Pos(), yson));
- return IGraphTransformer::TStatus::Ok;
- });
- }));
+ output = input;
+ input->SetState(TExprNode::EState::ExecutionComplete);
+ input->SetResult(ctx.NewAtom(input->Pos(), yson));
+ return SyncOk();
}
-
if (input->Content() == ConfigureName) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index de29871b07..c487f12839 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -1126,6 +1126,8 @@ public:
virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
+ virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
+
public:
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;
diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
index 7b3345fe58..18333d025b 100644
--- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
@@ -67,8 +67,6 @@ Y_UNIT_TEST_SUITE(KqpService) {
}
Y_UNIT_TEST(CloseSessionsWithLoad) {
- UNIT_FAIL("Fast fail to avoid 10 min time waste, https://github.com/ydb-platform/ydb/issues/5349");
-
auto kikimr = std::make_shared<TKikimrRunner>();
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);